use super::{ConsoleLayer, Server};
#[cfg(unix)]
use std::path::Path;
use std::{
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs},
path::PathBuf,
thread,
time::Duration,
};
use tokio::runtime;
use tracing::Subscriber;
use tracing_subscriber::{
filter::{self, FilterFn},
layer::{Layer, SubscriberExt},
prelude::*,
registry::LookupSpan,
};
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) event_buffer_capacity: usize,
pub(super) client_buffer_capacity: usize,
pub(crate) publish_interval: Duration,
pub(crate) retention: Duration,
pub(super) server_addr: ServerAddr,
pub(super) recording_path: Option<PathBuf>,
pub(super) filter_env_var: String,
self_trace: bool,
pub(super) poll_duration_max: Duration,
pub(super) scheduled_duration_max: Duration,
#[cfg(feature = "grpc-web")]
enable_grpc_web: bool,
}
impl Default for Builder {
fn default() -> Self {
Self {
event_buffer_capacity: ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY,
client_buffer_capacity: ConsoleLayer::DEFAULT_CLIENT_BUFFER_CAPACITY,
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
retention: ConsoleLayer::DEFAULT_RETENTION,
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
scheduled_duration_max: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX,
server_addr: ServerAddr::Tcp(SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT)),
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
#[cfg(feature = "grpc-web")]
enable_grpc_web: false,
}
}
}
impl Builder {
pub fn event_buffer_capacity(self, event_buffer_capacity: usize) -> Self {
Self {
event_buffer_capacity,
..self
}
}
pub fn client_buffer_capacity(self, client_buffer_capacity: usize) -> Self {
Self {
client_buffer_capacity,
..self
}
}
pub fn publish_interval(self, publish_interval: Duration) -> Self {
Self {
publish_interval,
..self
}
}
pub fn retention(self, retention: Duration) -> Self {
Self { retention, ..self }
}
pub fn server_addr(self, server_addr: impl Into<ServerAddr>) -> Self {
Self {
server_addr: server_addr.into(),
..self
}
}
pub fn recording_path(self, path: impl Into<PathBuf>) -> Self {
Self {
recording_path: Some(path.into()),
..self
}
}
pub fn filter_env_var(self, filter_env_var: impl Into<String>) -> Self {
Self {
filter_env_var: filter_env_var.into(),
..self
}
}
pub fn poll_duration_histogram_max(self, max: Duration) -> Self {
Self {
poll_duration_max: max,
..self
}
}
pub fn scheduled_duration_histogram_max(self, max: Duration) -> Self {
Self {
scheduled_duration_max: max,
..self
}
}
pub fn enable_self_trace(self, self_trace: bool) -> Self {
Self { self_trace, ..self }
}
#[cfg(feature = "grpc-web")]
pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self {
Self {
enable_grpc_web,
..self
}
}
pub fn build(self) -> (ConsoleLayer, Server) {
ConsoleLayer::build(self)
}
pub fn with_default_env(mut self) -> Self {
if let Some(retention) = duration_from_env("TOKIO_CONSOLE_RETENTION") {
self.retention = retention;
}
if let Ok(bind) = std::env::var("TOKIO_CONSOLE_BIND") {
self.server_addr = ServerAddr::Tcp(
bind.to_socket_addrs()
.expect(
"TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321",
)
.next()
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND"),
);
}
if let Some(interval) = duration_from_env("TOKIO_CONSOLE_PUBLISH_INTERVAL") {
self.publish_interval = interval;
}
if let Ok(path) = std::env::var("TOKIO_CONSOLE_RECORD_PATH") {
self.recording_path = Some(path.into());
}
if let Some(capacity) = usize_from_env("TOKIO_CONSOLE_BUFFER_CAPACITY") {
self.event_buffer_capacity = capacity;
}
self
}
pub fn init(self) {
#[cfg(feature = "env-filter")]
type Filter = filter::EnvFilter;
#[cfg(not(feature = "env-filter"))]
type Filter = filter::Targets;
let fmt_filter = std::env::var(&self.filter_env_var)
.ok()
.and_then(|log_filter| match log_filter.parse::<Filter>() {
Ok(targets) => Some(targets),
Err(e) => {
eprintln!(
"failed to parse filter environment variable `{}={:?}`: {}",
&self.filter_env_var, log_filter, e
);
None
}
})
.unwrap_or_else(|| {
"error"
.parse::<Filter>()
.expect("`error` filter should always parse successfully")
});
let console_layer = self.spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(tracing_subscriber::fmt::layer().with_filter(fmt_filter))
.init();
}
#[must_use = "a `Layer` must be added to a `tracing::Subscriber` in order to be used"]
pub fn spawn<S>(self) -> impl Layer<S>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn console_filter(meta: &tracing::Metadata<'_>) -> bool {
if meta.is_event() {
return meta.target().starts_with("runtime") || meta.target().starts_with("tokio");
}
meta.name().starts_with("runtime.") || meta.target().starts_with("tokio")
}
let self_trace = self.self_trace;
#[cfg(feature = "grpc-web")]
let enable_grpc_web = self.enable_grpc_web;
let (layer, server) = self.build();
let filter =
FilterFn::new(console_filter as for<'r, 's> fn(&'r tracing::Metadata<'s>) -> bool);
let layer = layer.with_filter(filter);
thread::Builder::new()
.name("console_subscriber".into())
.spawn(move || {
let _subscriber_guard;
if !self_trace {
_subscriber_guard = tracing::subscriber::set_default(
tracing_core::subscriber::NoSubscriber::default(),
);
}
let runtime = runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.expect("console subscriber runtime initialization failed");
runtime.block_on(async move {
#[cfg(feature = "grpc-web")]
if enable_grpc_web {
server
.serve_with_grpc_web(tonic::transport::Server::builder())
.await
.expect("console subscriber server failed");
return;
}
server
.serve()
.await
.expect("console subscriber server failed")
});
})
.expect("console subscriber could not spawn thread");
layer
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum ServerAddr {
Tcp(SocketAddr),
#[cfg(unix)]
Unix(PathBuf),
#[cfg(feature = "vsock")]
Vsock(tokio_vsock::VsockAddr),
}
impl From<SocketAddr> for ServerAddr {
fn from(addr: SocketAddr) -> ServerAddr {
ServerAddr::Tcp(addr)
}
}
impl From<SocketAddrV4> for ServerAddr {
fn from(addr: SocketAddrV4) -> ServerAddr {
ServerAddr::Tcp(addr.into())
}
}
impl From<SocketAddrV6> for ServerAddr {
fn from(addr: SocketAddrV6) -> ServerAddr {
ServerAddr::Tcp(addr.into())
}
}
impl<I> From<(I, u16)> for ServerAddr
where
I: Into<IpAddr>,
{
fn from(pieces: (I, u16)) -> ServerAddr {
ServerAddr::Tcp(pieces.into())
}
}
#[cfg(unix)]
impl From<PathBuf> for ServerAddr {
fn from(path: PathBuf) -> ServerAddr {
ServerAddr::Unix(path)
}
}
#[cfg(unix)]
impl<'a> From<&'a Path> for ServerAddr {
fn from(path: &'a Path) -> ServerAddr {
ServerAddr::Unix(path.to_path_buf())
}
}
#[cfg(feature = "vsock")]
impl From<tokio_vsock::VsockAddr> for ServerAddr {
fn from(addr: tokio_vsock::VsockAddr) -> ServerAddr {
ServerAddr::Vsock(addr)
}
}
pub fn init() {
ConsoleLayer::builder().with_default_env().init();
}
#[must_use = "a `Layer` must be added to a `tracing::Subscriber`in order to be used"]
pub fn spawn<S>() -> impl Layer<S>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
ConsoleLayer::builder().with_default_env().spawn::<S>()
}
fn duration_from_env(var_name: &str) -> Option<Duration> {
let var = std::env::var(var_name).ok()?;
match var.parse::<humantime::Duration>() {
Ok(dur) => Some(dur.into()),
Err(e) => panic!(
"failed to parse a duration from `{}={:?}`: {}",
var_name, var, e
),
}
}
fn usize_from_env(var_name: &str) -> Option<usize> {
let var = std::env::var(var_name).ok()?;
match var.parse::<usize>() {
Ok(num) => Some(num),
Err(e) => panic!(
"failed to parse a usize from `{}={:?}`: {}",
var_name, var, e
),
}
}