use super::{ConsoleLayer, Server};
use std::{
net::{SocketAddr, ToSocketAddrs},
path::PathBuf,
thread,
time::Duration,
};
use tokio::runtime;
use tracing::Subscriber;
use tracing_subscriber::{
filter::{self, FilterFn},
layer::{Layer, SubscriberExt},
prelude::*,
registry::LookupSpan,
};
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) event_buffer_capacity: usize,
pub(super) client_buffer_capacity: usize,
pub(crate) publish_interval: Duration,
pub(crate) retention: Duration,
pub(super) server_addr: SocketAddr,
pub(super) recording_path: Option<PathBuf>,
pub(super) filter_env_var: String,
self_trace: bool,
pub(super) poll_duration_max: Duration,
}
impl Default for Builder {
fn default() -> Self {
Self {
event_buffer_capacity: ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY,
client_buffer_capacity: ConsoleLayer::DEFAULT_CLIENT_BUFFER_CAPACITY,
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
retention: ConsoleLayer::DEFAULT_RETENTION,
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT),
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
}
}
}
impl Builder {
pub fn event_buffer_capacity(self, event_buffer_capacity: usize) -> Self {
Self {
event_buffer_capacity,
..self
}
}
pub fn client_buffer_capacity(self, client_buffer_capacity: usize) -> Self {
Self {
client_buffer_capacity,
..self
}
}
pub fn publish_interval(self, publish_interval: Duration) -> Self {
Self {
publish_interval,
..self
}
}
pub fn retention(self, retention: Duration) -> Self {
Self { retention, ..self }
}
pub fn server_addr(self, server_addr: impl Into<SocketAddr>) -> Self {
Self {
server_addr: server_addr.into(),
..self
}
}
pub fn recording_path(self, path: impl Into<PathBuf>) -> Self {
Self {
recording_path: Some(path.into()),
..self
}
}
pub fn filter_env_var(self, filter_env_var: impl Into<String>) -> Self {
Self {
filter_env_var: filter_env_var.into(),
..self
}
}
pub fn poll_duration_histogram_max(self, max: Duration) -> Self {
Self {
poll_duration_max: max,
..self
}
}
pub fn enable_self_trace(self, self_trace: bool) -> Self {
Self { self_trace, ..self }
}
pub fn build(self) -> (ConsoleLayer, Server) {
ConsoleLayer::build(self)
}
pub fn with_default_env(mut self) -> Self {
if let Some(retention) = duration_from_env("TOKIO_CONSOLE_RETENTION") {
self.retention = retention;
}
if let Ok(bind) = std::env::var("TOKIO_CONSOLE_BIND") {
self.server_addr = bind
.to_socket_addrs()
.expect("TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321")
.next()
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND");
}
if let Some(interval) = duration_from_env("TOKIO_CONSOLE_PUBLISH_INTERVAL") {
self.publish_interval = interval;
}
if let Ok(path) = std::env::var("TOKIO_CONSOLE_RECORD_PATH") {
self.recording_path = Some(path.into());
}
self
}
pub fn init(self) {
#[cfg(feature = "env-filter")]
type Filter = filter::EnvFilter;
#[cfg(not(feature = "env-filter"))]
type Filter = filter::Targets;
let fmt_filter = std::env::var(&self.filter_env_var)
.ok()
.and_then(|log_filter| match log_filter.parse::<Filter>() {
Ok(targets) => Some(targets),
Err(e) => {
eprintln!(
"failed to parse filter environment variable `{}={:?}`: {}",
&self.filter_env_var, log_filter, e
);
None
}
})
.unwrap_or_else(|| {
"error"
.parse::<Filter>()
.expect("`error` filter should always parse successfully")
});
let console_layer = self.spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(tracing_subscriber::fmt::layer().with_filter(fmt_filter))
.init();
}
#[must_use = "a `Layer` must be added to a `tracing::Subscriber` in order to be used"]
pub fn spawn<S>(self) -> impl Layer<S>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn console_filter(meta: &tracing::Metadata<'_>) -> bool {
if meta.is_event() {
return meta.target().starts_with("runtime") || meta.target().starts_with("tokio");
}
meta.name().starts_with("runtime.") || meta.target().starts_with("tokio")
}
let self_trace = self.self_trace;
let (layer, server) = self.build();
let filter =
FilterFn::new(console_filter as for<'r, 's> fn(&'r tracing::Metadata<'s>) -> bool);
let layer = layer.with_filter(filter);
thread::Builder::new()
.name("console_subscriber".into())
.spawn(move || {
let _subscriber_guard;
if !self_trace {
_subscriber_guard = tracing::subscriber::set_default(
tracing_core::subscriber::NoSubscriber::default(),
);
}
let runtime = runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.expect("console subscriber runtime initialization failed");
runtime.block_on(async move {
server
.serve()
.await
.expect("console subscriber server failed")
});
})
.expect("console subscriber could not spawn thread");
layer
}
}
pub fn init() {
ConsoleLayer::builder().with_default_env().init();
}
#[must_use = "a `Layer` must be added to a `tracing::Subscriber`in order to be used"]
pub fn spawn<S>() -> impl Layer<S>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
ConsoleLayer::builder().with_default_env().spawn::<S>()
}
fn duration_from_env(var_name: &str) -> Option<Duration> {
let var = std::env::var(var_name).ok()?;
match var.parse::<humantime::Duration>() {
Ok(dur) => Some(dur.into()),
Err(e) => panic!(
"failed to parse a duration from `{}={:?}`: {}",
var_name, var, e
),
}
}