use std::{
fs::{self, File},
io::Write,
};
use abscissa_core::{Component, FrameworkError, Shutdown};
use tokio::sync::watch;
use tracing::{field::Visit, Level};
use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
use tracing_error::ErrorLayer;
use tracing_subscriber::{
fmt::{format, Formatter},
layer::SubscriberExt,
reload::Handle,
util::SubscriberInitExt,
EnvFilter, Layer,
};
use zebra_chain::parameters::Network;
use crate::{application::build_version, components::tracing::Config};
#[cfg(feature = "flamegraph")]
use super::flame;
static ZEBRA_ART: [u8; include_bytes!("zebra.utf8").len()] = *include_bytes!("zebra.utf8");
pub type BoxWrite = Box<dyn Write + Send + Sync + 'static>;
pub struct Tracing {
filter_handle: Option<
Handle<
EnvFilter,
Formatter<format::DefaultFields, format::Format<format::Full>, NonBlocking>,
>,
>,
initial_filter: String,
#[cfg(feature = "flamegraph")]
flamegrapher: Option<flame::Grapher>,
#[cfg(feature = "opentelemetry")]
otel_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
_guard: Option<WorkerGuard>,
}
impl Tracing {
#[allow(clippy::print_stdout, clippy::print_stderr, clippy::unwrap_in_result)]
pub fn new(
network: &Network,
config: Config,
uses_intro: bool,
) -> Result<Self, FrameworkError> {
let use_color = config.use_color_stdout();
let use_color_stderr = config.use_color_stderr();
let filter = config.filter.clone().unwrap_or_default();
let flame_root = &config.flamegraph;
if uses_intro && !network.is_regtest() {
if use_color_stderr {
eprint!("\x1B[2J");
eprintln!(
"{}",
std::str::from_utf8(&ZEBRA_ART)
.expect("should always work on a UTF-8 encoded constant")
);
}
eprintln!(
"Thank you for running a {} zebrad {} node!",
network.lowercase_name(),
build_version()
);
eprintln!(
"You're helping to strengthen the network and contributing to a social good :)"
);
}
let writer = if let Some(log_file) = config.log_file.as_ref() {
let log_file_dir = log_file.parent();
if let Some(log_file_dir) = log_file_dir {
if !log_file_dir.exists() {
eprintln!("Directory for log file {log_file:?} does not exist, trying to create it...");
if let Err(create_dir_error) = fs::create_dir_all(log_file_dir) {
eprintln!("Failed to create directory for log file: {create_dir_error}");
eprintln!("Trying log file anyway...");
}
}
}
if uses_intro {
println!("Sending logs to {log_file:?}...");
}
let log_file = File::options().append(true).create(true).open(log_file)?;
Box::new(log_file) as BoxWrite
} else {
let stdout = std::io::stdout();
Box::new(stdout) as BoxWrite
};
let (non_blocking, worker_guard) = NonBlockingBuilder::default()
.buffered_lines_limit(config.buffer_limit.max(100))
.finish(writer);
#[cfg(not(all(feature = "tokio-console", tokio_unstable)))]
let (subscriber, filter_handle) = {
use tracing_subscriber::FmtSubscriber;
let logger = FmtSubscriber::builder()
.with_ansi(use_color)
.with_writer(non_blocking)
.with_env_filter(&filter);
#[cfg(feature = "filter-reload")]
let (filter_handle, logger) = {
let logger = logger.with_filter_reloading();
(Some(logger.reload_handle()), logger)
};
#[cfg(not(feature = "filter-reload"))]
let filter_handle = None;
let warn_error_layer = LastWarnErrorLayer {
last_warn_error_sender: crate::application::LAST_WARN_ERROR_LOG_SENDER.clone(),
};
let subscriber = logger
.finish()
.with(warn_error_layer)
.with(ErrorLayer::default());
(subscriber, filter_handle)
};
#[cfg(all(feature = "tokio-console", tokio_unstable))]
let (subscriber, filter_handle) = {
use tracing_subscriber::{fmt, Layer};
let subscriber = tracing_subscriber::registry();
let logger = fmt::Layer::new()
.with_ansi(use_color)
.with_writer(non_blocking)
.with_filter(EnvFilter::from(&filter));
let subscriber = subscriber.with(logger);
let span_logger = ErrorLayer::default().with_filter(EnvFilter::from(&filter));
let subscriber = subscriber.with(span_logger);
(subscriber, None)
};
#[cfg(feature = "flamegraph")]
let (flamelayer, flamegrapher) = if let Some(path) = flame_root {
let (flamelayer, flamegrapher) = flame::layer(path);
(Some(flamelayer), Some(flamegrapher))
} else {
(None, None)
};
#[cfg(feature = "flamegraph")]
let subscriber = subscriber.with(flamelayer);
#[cfg(feature = "journald")]
let journaldlayer = if config.use_journald {
use abscissa_core::FrameworkErrorKind;
let layer = tracing_journald::layer()
.map_err(|e| FrameworkErrorKind::ComponentError.context(e))?;
#[cfg(all(feature = "tokio-console", tokio_unstable))]
let layer = {
use tracing_subscriber::Layer;
layer.with_filter(EnvFilter::from(&filter))
};
Some(layer)
} else {
None
};
#[cfg(feature = "journald")]
let subscriber = subscriber.with(journaldlayer);
#[cfg(feature = "sentry")]
let subscriber = subscriber.with(sentry::integrations::tracing::layer());
#[cfg(feature = "opentelemetry")]
let (otel_layer, otel_provider, otel_resolved_config) = {
let endpoint = config
.opentelemetry_endpoint
.clone()
.or_else(|| std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok());
let service_name = config
.opentelemetry_service_name
.clone()
.or_else(|| std::env::var("OTEL_SERVICE_NAME").ok());
let sample_percent = config.opentelemetry_sample_percent.or_else(|| {
std::env::var("OTEL_TRACES_SAMPLER_ARG")
.ok()
.and_then(|s| s.parse().ok())
});
let resolved_config = (
endpoint.clone(),
service_name.clone().unwrap_or_else(|| "zebra".to_string()),
sample_percent.unwrap_or(100),
);
match super::otel::layer(endpoint.as_deref(), service_name.as_deref(), sample_percent) {
Ok((layer, provider)) => (layer, provider, resolved_config),
Err(e) => {
tracing::warn!(
?e,
"failed to initialize OpenTelemetry, traces will not be exported"
);
(None, None, resolved_config)
}
}
};
#[cfg(feature = "opentelemetry")]
let subscriber = subscriber.with(otel_layer);
#[cfg(all(feature = "tokio-console", tokio_unstable))]
let subscriber = subscriber.with(console_subscriber::spawn());
subscriber.init();
tracing::info!(
?filter,
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"started tracing component",
);
if flame_root.is_some() {
if cfg!(feature = "flamegraph") {
info!(flamegraph = ?flame_root, "installed flamegraph tracing layer");
} else {
warn!(
flamegraph = ?flame_root,
"unable to activate configured flamegraph: \
enable the 'flamegraph' feature when compiling zebrad",
);
}
}
if config.use_journald {
if cfg!(feature = "journald") {
info!("installed journald tracing layer");
} else {
warn!(
"unable to activate configured journald tracing: \
enable the 'journald' feature when compiling zebrad",
);
}
}
#[cfg(feature = "sentry")]
info!("installed sentry tracing layer");
#[cfg(all(feature = "tokio-console", tokio_unstable))]
info!(
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"installed tokio-console tracing layer",
);
#[cfg(feature = "opentelemetry")]
if otel_provider.is_some() {
let (ref endpoint, ref service_name, sample_percent) = otel_resolved_config;
info!(
?endpoint,
%service_name,
sample_percent,
"installed OpenTelemetry tracing layer",
);
}
#[cfg(not(feature = "opentelemetry"))]
if config.opentelemetry_endpoint.is_some() {
warn!(
endpoint = ?config.opentelemetry_endpoint,
"unable to activate OpenTelemetry tracing: \
enable the 'opentelemetry' feature when compiling zebrad",
);
}
#[cfg(feature = "progress-bar")]
if let Some(progress_bar_config) = config.progress_bar.as_ref() {
use howudoin::consumers::TermLine;
use std::time::Duration;
const PROGRESS_BAR_DEBOUNCE: Duration = Duration::from_secs(2);
let terminal_consumer = TermLine::with_debounce(PROGRESS_BAR_DEBOUNCE);
howudoin::init(terminal_consumer);
info!(?progress_bar_config, "activated progress bars");
} else {
info!(
"set 'tracing.progress_bar =\"summary\"' in zebrad.toml to activate progress bars"
);
}
Ok(Self {
filter_handle,
initial_filter: filter,
#[cfg(feature = "flamegraph")]
flamegrapher,
#[cfg(feature = "opentelemetry")]
otel_provider,
_guard: Some(worker_guard),
})
}
pub fn shutdown(&mut self) {
self.filter_handle.take();
#[cfg(feature = "flamegraph")]
self.flamegrapher.take();
#[cfg(feature = "opentelemetry")]
if let Some(provider) = self.otel_provider.take() {
if let Err(e) = provider.shutdown() {
tracing::warn!(?e, "OpenTelemetry shutdown error");
}
}
self._guard.take();
}
pub fn filter(&self) -> String {
if let Some(filter_handle) = self.filter_handle.as_ref() {
filter_handle
.with_current(|filter| filter.to_string())
.expect("the subscriber is not dropped before the component is")
} else {
self.initial_filter.clone()
}
}
pub fn reload_filter(&self, filter: impl Into<EnvFilter>) {
let filter = filter.into();
if let Some(filter_handle) = self.filter_handle.as_ref() {
tracing::info!(
?filter,
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"reloading tracing filter",
);
filter_handle
.reload(filter)
.expect("the subscriber is not dropped before the component is");
} else {
tracing::warn!(
?filter,
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"attempted to reload tracing filter, but filter reloading is disabled",
);
}
}
}
impl std::fmt::Debug for Tracing {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tracing").finish()
}
}
impl<A: abscissa_core::Application> Component<A> for Tracing {
fn id(&self) -> abscissa_core::component::Id {
abscissa_core::component::Id::new("zebrad::components::tracing::component::Tracing")
}
fn version(&self) -> abscissa_core::Version {
build_version()
}
fn before_shutdown(&self, _kind: Shutdown) -> Result<(), FrameworkError> {
#[cfg(feature = "flamegraph")]
if let Some(ref grapher) = self.flamegrapher {
use abscissa_core::FrameworkErrorKind;
info!("writing flamegraph");
grapher
.write_flamegraph()
.map_err(|e| FrameworkErrorKind::ComponentError.context(e))?
}
#[cfg(feature = "progress-bar")]
howudoin::disable();
Ok(())
}
}
impl Drop for Tracing {
fn drop(&mut self) {
#[cfg(feature = "progress-bar")]
howudoin::disable();
}
}
struct MessageVisitor {
message: Option<String>,
}
impl Visit for MessageVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = Some(format!("{value:?}"));
}
}
}
#[derive(Debug, Clone)]
struct LastWarnErrorLayer {
last_warn_error_sender: watch::Sender<Option<(String, Level, chrono::DateTime<chrono::Utc>)>>,
}
impl<S> Layer<S> for LastWarnErrorLayer
where
S: tracing::Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
{
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let level = *event.metadata().level();
let timestamp = chrono::Utc::now();
if level == Level::WARN || level == Level::ERROR {
let mut visitor = MessageVisitor { message: None };
event.record(&mut visitor);
if let Some(message) = visitor.message {
let _ = self
.last_warn_error_sender
.send(Some((message, level, timestamp)));
}
}
}
}