telemetry_subscribers/lib.rs
1// Copyright (c) 2023, Evan Chan
2// Copyright (c) 2022, Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5//! # Telemetry-subscribers
6//!
7//! This is a library for common telemetry functionality, especially subscribers for [Tokio tracing](https://github.com/tokio-rs/tracing)
8//! libraries. Here we simply package many common subscribers, such as writing trace data to Jaeger, distributed tracing,
9//! common logs and metrics destinations, etc. into a easy to configure common package. There are also
10//! some unique layers such as one to automatically create Prometheus latency histograms for spans.
11//!
12//! We also purposely separate out logging levels from span creation. This is often needed by production apps
13//! as normally it is not desired to log at very high levels, but still desirable to gather sampled span data
14//! all the way down to TRACE level spans.
15//!
16//! Getting started is easy. In your app:
17//!
18//! ```rust
19//! use telemetry_subscribers::TelemetryConfig;
20//! let (_guard, _handle) = TelemetryConfig::new("my_app").init();
21//! ```
22//!
23//! It is important to retain the guard until the end of the program. Assign it in the main fn and keep it,
24//! for once it drops then log output will stop.
25//!
26//! There is a builder API available: just do `TelemetryConfig::new()...` Another convenient initialization method
27//! is `TelemetryConfig::new().with_env()` to populate the config from environment vars.
28//!
29//! You can also run the example and see output in ANSI color:
30//!
31//! ```bash
32//! cargo run --example easy-init
33//! ```
34//!
35//! ## Features
36//! - `jaeger` - this feature is enabled by default as it enables jaeger tracing
37//! - `json` - Bunyan formatter - JSON log output, optional
38//! - `tokio-console` - [Tokio-console](https://github.com/tokio-rs/console) subscriber, optional
39//! - `chrome` - enables use of `chrome://tracing` to visualize output, optional
40//!
41//! ### Stdout vs file output
42//!
43//! By default, logs (but not spans) are formatted for human readability and output to stdout, with key-value tags at the end of every line.
44//! `RUST_LOG` can be configured for custom logging output, including filtering.
45//!
46//! By setting `log_file` in the config, one can write log output to a daily-rotated file.
47//!
48//! ### Tracing and span output
49//!
50//! Detailed span start and end logs can be generated in two ways:
51//! * by defining the `span_log_output` config variable / `ENABLE_SPAN_LOGS` env var. It is false by default since generating more logs does have a perf impact.
52//! * by defining the `json_log_output` config variable / `ENABLE_JSON_LOGS` env var. Note that this causes all output to be in JSON format, which is not as human-readable, so it is not enabled by default.
53//!
54//! JSON output can easily be fed to backends such as ElasticSearch for indexing, alerts, aggregation, and analysis.
55//! It requires the `json` crate feature to be enabled.
56//!
57//! ### Jaeger (seeing distributed traces)
58//!
59//! To see nested spans visualized with [Jaeger](https://www.jaegertracing.io), do the following:
60//!
61//! 1. Run this to get a local Jaeger container: `docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest`
62//! 2. Set `enable_jaeger` config setting to true or set `TOKIO_JAEGER` env var
63//! 3. Run your app
64//! 4. Browse to `http://localhost:16686/` and select the service you configured using `service_name`
65//!
66//! NOTE: separate spans (which are not nested) are not connected as a single trace for now.
67//!
68//! Jaeger subscriber is enabled by default but is protected by the jaeger feature flag. If you'd like to leave
69//! out the Jaeger dependencies, you can turn off the default-features in your dependency:
70//!
71//! ```toml
72//! telemetry = { url = "...", default-features = false }
73//! ```
74//!
75//! ### Automatic Prometheus span latencies
76//!
77//! Included in this library is a tracing-subscriber layer named `PrometheusSpanLatencyLayer`. It will create
78//! a Prometheus histogram to track latencies for every span in your app, which is super convenient for tracking
79//! span performance in production apps.
80//!
81//! Enabling this layer can only be done programmatically, by passing in a Prometheus registry to `TelemetryConfig`.
82//!
83//! ### Span levels vs log levels
84//!
85//! What spans are included for Jaeger output, automatic span latencies, etc.? These are controlled by
86//! the `span_level` config attribute, or the `TOKIO_SPAN_LEVEL` environment variable. Note that this is
87//! separate from `RUST_LOG`, so that you can separately control the logging verbosity from the level of
88//! spans that are to be recorded and traced.
89//!
90//! Note that span levels for regular logging output are not affected by the span level config.
91//!
92//! ### Live async inspection / Tokio Console
93//!
94//! [Tokio-console](https://github.com/tokio-rs/console) is an awesome CLI tool designed to analyze and help debug Rust apps using Tokio, in real time! It relies on a special subscriber.
95//!
96//! 1. Build your app using a special flag: `RUSTFLAGS="--cfg tokio_unstable" cargo build`
97//! 2. Enable the `tokio-console` feature for this crate.
98//! 2. Set the `tokio_console` config setting when running your app (or set TOKIO_CONSOLE env var if using config `with_env()` method)
99//! 3. Clone the console repo and `cargo run` to launch the console
100//!
101//! NOTE: setting tokio TRACE logs is NOT necessary. It says that in the docs but there's no need to change Tokio logging levels at all. The console subscriber has a special filter enabled taking care of that.
102//!
103//! By default, Tokio console listens on port 6669. To change this setting as well as other setting such as
104//! the retention policy, please see the [configuration](https://docs.rs/console-subscriber/latest/console_subscriber/struct.Builder.html#configuration) guide.
105//!
106//! ### Custom panic hook
107//!
108//! This library installs a custom panic hook which records a log (event) at ERROR level using the tracing
109//! crate. This allows span information from the panic to be properly recorded as well.
110//!
111//! To exit the process on panic, set the `CRASH_ON_PANIC` environment variable.
112
113use span_latency_prom::PrometheusSpanLatencyLayer;
114use std::{
115 env,
116 io::{stderr, Write},
117 str::FromStr,
118};
119use tracing::metadata::LevelFilter;
120use tracing::Level;
121use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
122use tracing_subscriber::{
123 filter::{self, FilterExt},
124 fmt::{self, format::FmtSpan},
125 layer::{Filter, SubscriberExt},
126 reload,
127 util::SubscriberInitExt,
128 EnvFilter, Layer, Registry,
129};
130
131use crossterm::tty::IsTty;
132
133pub mod span_latency_prom;
134
135/// Alias for a type-erased error type.
136pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
137
138/// Configuration for different logging/tracing options
139/// ===
140/// - json_log_output: Output JSON logs to stdout only.
141/// - log_file: If defined, write output to a file starting with this name, ex app.log
142/// - log_level: error/warn/info/debug/trace, defaults to info
143/// - service_name:
144#[derive(Default)]
145pub struct TelemetryConfig {
146 /// The name of the service for Jaeger and Bunyan
147 pub service_name: String,
148
149 pub enable_jaeger: bool,
150 /// Enables Tokio Console debugging on port 6669
151 pub tokio_console: bool,
152 /// Output JSON logs.
153 pub json_log_output: bool,
154 /// Output span start/close to regular logs, controlled using regular RUST_LOG levels
155 pub span_log_output: bool,
156 /// Write chrome trace output, which can be loaded from chrome://tracing
157 pub chrome_trace_output: bool,
158 /// If defined, write output to a file starting with this name, ex app.log
159 pub log_file: Option<String>,
160 /// Log level to set, defaults to info
161 pub log_string: Option<String>,
162 /// Span level - what level of spans should be created. Note this is not same as logging level
163 /// If set to None, then defaults to INFO
164 pub span_level: Option<Level>,
165 /// Set a panic hook
166 pub panic_hook: bool,
167 /// Crash on panic
168 pub crash_on_panic: bool,
169 /// Optional Prometheus registry - if present, all enabled span latencies are measured
170 pub prom_registry: Option<prometheus::Registry>,
171 /// Pass in any additional custom layers that the consumer wants their subscriber to be created with
172 pub custom_layers: Vec<Box<dyn Layer<Registry> + Send + Sync + 'static>>,
173 /// Pass in any custom filter that will be applied along with the default filter
174 pub custom_filter: Option<Box<dyn Filter<Registry> + Send + Sync>>,
175}
176
177#[must_use]
178#[allow(dead_code)]
179pub struct TelemetryGuards {
180 worker_guard: WorkerGuard,
181
182 #[cfg(feature = "chrome")]
183 chrome_guard: Option<tracing_chrome::FlushGuard>,
184}
185
186#[derive(Clone, Debug)]
187pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
188
189impl FilterHandle {
190 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
191 let filter = EnvFilter::try_new(directives)?;
192 self.0.reload(filter)?;
193 Ok(())
194 }
195
196 pub fn get(&self) -> Result<String, BoxError> {
197 self.0
198 .with_current(|filter| filter.to_string())
199 .map_err(Into::into)
200 }
201}
202
203fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
204 if let Some(logfile_prefix) = log_file {
205 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
206 tracing_appender::non_blocking(file_appender)
207 } else {
208 tracing_appender::non_blocking(stderr())
209 }
210}
211
212// NOTE: this function is copied from tracing's panic_hook example
213fn set_panic_hook(crash_on_panic: bool) {
214 let default_panic_handler = std::panic::take_hook();
215
216 // Set a panic hook that records the panic as a `tracing` event at the
217 // `ERROR` verbosity level.
218 //
219 // If we are currently in a span when the panic occurred, the logged event
220 // will include the current span, allowing the context in which the panic
221 // occurred to be recorded.
222 std::panic::set_hook(Box::new(move |panic| {
223 // If the panic has a source location, record it as structured fields.
224 if let Some(location) = panic.location() {
225 // On nightly Rust, where the `PanicInfo` type also exposes a
226 // `message()` method returning just the message, we could record
227 // just the message instead of the entire `fmt::Display`
228 // implementation, avoiding the duplicated location
229 tracing::error!(
230 message = %panic,
231 panic.file = location.file(),
232 panic.line = location.line(),
233 panic.column = location.column(),
234 );
235 } else {
236 tracing::error!(message = %panic);
237 }
238
239 default_panic_handler(panic);
240
241 // We're panicking so we can't do anything about the flush failing
242 let _ = std::io::stderr().flush();
243 let _ = std::io::stdout().flush();
244
245 if crash_on_panic {
246 // Kill the process
247 std::process::exit(12);
248 }
249 }));
250}
251
252impl TelemetryConfig {
253 pub fn new(service_name: &str) -> Self {
254 Self {
255 service_name: service_name.to_owned(),
256 enable_jaeger: false,
257 tokio_console: false,
258 json_log_output: false,
259 span_log_output: false,
260 chrome_trace_output: false,
261 log_file: None,
262 log_string: None,
263 span_level: None,
264 panic_hook: true,
265 crash_on_panic: false,
266 prom_registry: None,
267 custom_layers: Vec::new(),
268 custom_filter: None,
269 }
270 }
271
272 pub fn with_log_level(mut self, log_string: &str) -> Self {
273 self.log_string = Some(log_string.to_owned());
274 self
275 }
276
277 pub fn with_span_level(mut self, span_level: Level) -> Self {
278 self.span_level = Some(span_level);
279 self
280 }
281
282 pub fn with_log_file(mut self, filename: &str) -> Self {
283 self.log_file = Some(filename.to_owned());
284 self
285 }
286
287 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
288 self.prom_registry = Some(registry.clone());
289 self
290 }
291
292 pub fn with_layer<L>(mut self, layer: L) -> Self
293 where
294 L: Layer<Registry> + Send + Sync + 'static,
295 {
296 self.custom_layers.push(Box::new(layer));
297 self
298 }
299
300 pub fn with_json_logs(mut self) -> Self {
301 self.json_log_output = true;
302 self
303 }
304
305 pub fn with_custom_filter<F>(mut self, filter: F) -> Self
306 where
307 F: Filter<Registry> + Send + Sync + 'static,
308 {
309 self.custom_filter = Some(Box::new(filter));
310 self
311 }
312
313 pub fn with_env(mut self) -> Self {
314 if env::var("CRASH_ON_PANIC").is_ok() {
315 self.crash_on_panic = true
316 }
317
318 if env::var("TOKIO_JAEGER").is_ok() {
319 self.enable_jaeger = true
320 }
321
322 if env::var("TOKIO_CHROME").is_ok() {
323 self.chrome_trace_output = true;
324 }
325
326 if env::var("ENABLE_JSON_LOGS").is_ok() {
327 self.json_log_output = true;
328 }
329
330 if env::var("ENABLE_SPAN_LOGS").is_ok() {
331 self.span_log_output = true;
332 }
333
334 if env::var("TOKIO_CONSOLE").is_ok() {
335 self.tokio_console = true;
336 }
337
338 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
339 self.span_level =
340 Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
341 }
342
343 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
344 self.log_file = Some(filepath);
345 }
346
347 self
348 }
349
350 pub fn init(self) -> (TelemetryGuards, FilterHandle) {
351 let config = self;
352
353 // Setup an EnvFilter for filtering logging output layers.
354 // NOTE: we don't want to use this to filter all layers. That causes problems for layers with
355 // different filtering needs, including tokio-console/console-subscriber, and it also doesn't
356 // fit with the span creation needs for distributed tracing and other span-based tools.
357 let log_level = config.log_string.unwrap_or_else(|| "info".into());
358 let env_filter =
359 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(log_level));
360 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
361 let filter_handle = FilterHandle(reload_handle);
362
363 // Separate span level filter.
364 // This is a dumb filter for now - allows all spans that are below a given level.
365 // TODO: implement a sampling filter
366 let span_level = config.span_level.unwrap_or(Level::INFO);
367 let span_filter = filter::filter_fn(move |metadata| {
368 metadata.is_span() && *metadata.level() <= span_level
369 });
370
371 let mut layers = Vec::new();
372
373 // tokio-console layer
374 // Please see https://docs.rs/console-subscriber/latest/console_subscriber/struct.Builder.html#configuration
375 // for environment vars/config options
376 #[cfg(feature = "tokio-console")]
377 if config.tokio_console {
378 layers.push(console_subscriber::spawn().boxed());
379 }
380
381 #[cfg(feature = "chrome")]
382 let chrome_guard = if config.chrome_trace_output {
383 let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new().build();
384 layers.push(chrome_layer.boxed());
385 Some(guard)
386 } else {
387 None
388 };
389
390 if let Some(registry) = config.prom_registry {
391 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
392 .expect("Could not initialize span latency layer");
393 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
394 }
395
396 #[cfg(feature = "jaeger")]
397 if config.enable_jaeger {
398 // Install a tracer to send traces to Jaeger. Batching for better performance.
399 let tracer = opentelemetry_jaeger::new_agent_pipeline()
400 .with_service_name(&config.service_name)
401 .with_max_packet_size(9216) // Default max UDP packet size on OSX
402 .with_auto_split_batch(true) // Auto split batches so they fit under packet size
403 .install_batch(opentelemetry::runtime::Tokio)
404 .expect("Could not create async Tracer");
405
406 // Create a tracing subscriber with the configured tracer
407 let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
408
409 // Enable Trace Contexts for tying spans together
410 opentelemetry::global::set_text_map_propagator(
411 opentelemetry::sdk::propagation::TraceContextPropagator::new(),
412 );
413
414 layers.push(telemetry.with_filter(span_filter.clone()).boxed());
415 }
416
417 let (nb_output, worker_guard) = get_output(config.log_file.clone());
418
419 // Create the base format layer
420 let fmt_layer = fmt::layer().with_writer(nb_output);
421 let fmt_layer = if config.json_log_output {
422 let layer = fmt_layer.json().flatten_event(true);
423 if let Some(custom_filter) = config.custom_filter {
424 layer.with_filter(log_filter.and(custom_filter)).boxed()
425 } else {
426 layer.with_filter(log_filter).boxed()
427 }
428 } else {
429 let span_events = if config.span_log_output {
430 FmtSpan::NEW | FmtSpan::CLOSE
431 } else {
432 FmtSpan::NONE
433 };
434 fmt_layer
435 .with_ansi(config.log_file.is_none() && stderr().is_tty())
436 .with_span_events(span_events)
437 .with_filter(log_filter)
438 .boxed()
439 };
440 layers.push(fmt_layer);
441
442 // Add custom layers
443 for layer in config.custom_layers {
444 layers.push(layer);
445 }
446
447 tracing_subscriber::registry().with(layers).init();
448
449 if config.panic_hook {
450 set_panic_hook(config.crash_on_panic);
451 }
452
453 // The guard must be returned and kept in the main fn of the app, as when it's dropped then the output
454 // gets flushed and closed. If this is dropped too early then no output will appear!
455 let guards = TelemetryGuards {
456 worker_guard,
457 #[cfg(feature = "chrome")]
458 chrome_guard,
459 };
460
461 (guards, filter_handle)
462 }
463}
464
465/// Globally set a tracing subscriber suitable for testing environments
466pub fn init_for_testing() {
467 use once_cell::sync::Lazy;
468
469 static LOGGER: Lazy<()> = Lazy::new(|| {
470 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
471 .with_env_filter(
472 EnvFilter::builder()
473 .with_default_directive(LevelFilter::INFO.into())
474 .from_env_lossy(),
475 )
476 .with_file(true)
477 .with_line_number(true)
478 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
479 .with_test_writer()
480 .finish();
481 ::tracing::subscriber::set_global_default(subscriber)
482 .expect("unable to initialize logging for tests");
483 });
484
485 Lazy::force(&LOGGER);
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use prometheus::proto::MetricType;
492 use std::time::Duration;
493 use tracing::{debug, debug_span, info, trace_span, warn};
494
495 #[test]
496 #[should_panic]
497 fn test_telemetry_init() {
498 let registry = prometheus::Registry::new();
499 // Default logging level is INFO, but here we set the span level to DEBUG. TRACE spans should be ignored.
500 let config = TelemetryConfig::new("my_app")
501 .with_span_level(Level::DEBUG)
502 .with_prom_registry(®istry);
503 let _guard = config.init();
504
505 info!(a = 1, "This will be INFO.");
506 // Spans are debug level or below, so they won't be printed out either. However latencies
507 // should be recorded for at least one span
508 debug_span!("yo span yo").in_scope(|| {
509 // This debug log will not print out, log level set to INFO by default
510 debug!(a = 2, "This will be DEBUG.");
511 std::thread::sleep(Duration::from_millis(100));
512 warn!(a = 3, "This will be WARNING.");
513 });
514
515 // This span won't be enabled
516 trace_span!("this span should not be created").in_scope(|| {
517 info!("This log appears, but surrounding span is not created");
518 std::thread::sleep(Duration::from_millis(100));
519 });
520
521 let metrics = registry.gather();
522 // There should be 1 metricFamily and 1 metric
523 assert_eq!(metrics.len(), 1);
524 assert_eq!(metrics[0].get_name(), "tracing_span_latencies");
525 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
526 let inner = metrics[0].get_metric();
527 assert_eq!(inner.len(), 1);
528 let labels = inner[0].get_label();
529 assert_eq!(labels.len(), 1);
530 assert_eq!(labels[0].get_name(), "span_name");
531 assert_eq!(labels[0].get_value(), "yo span yo");
532
533 panic!("This should cause error logs to be printed out!");
534 }
535
536 /*
537 Both the following tests should be able to "race" to initialize logging without causing a
538 panic
539 */
540 #[test]
541 fn testing_logger_1() {
542 init_for_testing();
543 }
544
545 #[test]
546 fn testing_logger_2() {
547 init_for_testing();
548 }
549}