use dragonfly_client_config::dfdaemon::Host;
use opentelemetry::{global, trace::TracerProvider, KeyValue};
use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
use rolling_file::*;
use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
use tracing::{info, Level};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{
filter::LevelFilter,
fmt::{time::ChronoLocal, Layer},
prelude::*,
EnvFilter, Registry,
};
const SPAN_EXPORTER_TIMEOUT: Duration = Duration::from_secs(10);
#[allow(clippy::too_many_arguments)]
pub fn init_tracing(
name: &str,
log_dir: PathBuf,
log_level: Level,
log_max_files: usize,
otel_protocol: Option<String>,
otel_endpoint: Option<String>,
otel_path: Option<PathBuf>,
otel_headers: Option<reqwest::header::HeaderMap>,
host: Option<Host>,
is_seed_peer: bool,
console: bool,
) -> Vec<WorkerGuard> {
let mut guards = vec![];
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);
let stdout_filter = if console {
LevelFilter::DEBUG
} else {
LevelFilter::OFF
};
let stdout_logging_layer = Layer::new()
.with_writer(stdout_writer)
.with_file(true)
.with_line_number(true)
.with_target(false)
.with_thread_names(false)
.with_thread_ids(false)
.with_timer(ChronoLocal::rfc_3339())
.pretty()
.with_filter(stdout_filter);
fs::create_dir_all(log_dir.clone()).expect("failed to create log directory");
let rolling_appender = BasicRollingFileAppender::new(
log_dir.join(name).with_extension("log"),
RollingConditionBasic::new().hourly(),
log_max_files,
)
.expect("failed to create rolling file appender");
let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender);
guards.push(rolling_writer_guard);
let file_logging_layer = Layer::new()
.with_writer(rolling_writer)
.with_ansi(false)
.with_file(true)
.with_line_number(true)
.with_target(false)
.with_thread_names(false)
.with_thread_ids(false)
.with_timer(ChronoLocal::rfc_3339())
.compact();
let env_filter = EnvFilter::from_default_env().add_directive(log_level.into());
let console_subscriber_layer = if log_level == Level::TRACE {
Some(console_subscriber::spawn())
} else {
None
};
let subscriber = Registry::default()
.with(env_filter)
.with(console_subscriber_layer)
.with(file_logging_layer)
.with(stdout_logging_layer);
if let (Some(protocol), Some(endpoint)) = (otel_protocol, otel_endpoint) {
let otlp_exporter = match protocol.as_str() {
"grpc" => {
let mut metadata = MetadataMap::new();
if let Some(headers) = otel_headers {
for (key, value) in headers.iter() {
metadata.insert(
MetadataKey::from_str(key.as_str())
.expect("failed to create metadata key"),
MetadataValue::from_str(value.to_str().unwrap())
.expect("failed to create metadata value"),
);
}
}
let endpoint_url = url::Url::parse(&format!("http://{}", endpoint))
.expect("failed to parse OTLP endpoint URL");
opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint_url)
.with_timeout(SPAN_EXPORTER_TIMEOUT)
.with_metadata(metadata)
.build()
.expect("failed to create OTLP exporter")
}
"http" | "https" => {
let mut endpoint_url = url::Url::parse(&format!("{}://{}", protocol, endpoint))
.expect("failed to parse OTLP endpoint URL");
if let Some(path) = otel_path {
endpoint_url = endpoint_url
.join(path.to_str().unwrap())
.expect("failed to join OTLP endpoint path");
}
opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(endpoint_url.as_str())
.with_protocol(opentelemetry_otlp::Protocol::HttpJson)
.with_timeout(SPAN_EXPORTER_TIMEOUT)
.build()
.expect("failed to create OTLP exporter")
}
_ => {
panic!("unsupported OTLP protocol: {}", protocol);
}
};
let host = host.unwrap();
let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(otlp_exporter)
.with_resource(
Resource::builder()
.with_service_name(format!("{}-{}", name, host.ip.unwrap()))
.with_schema_url(
[
KeyValue::new(
opentelemetry_semantic_conventions::attribute::SERVICE_NAMESPACE,
"dragonfly",
),
KeyValue::new(
opentelemetry_semantic_conventions::attribute::HOST_NAME,
host.hostname,
),
KeyValue::new(
opentelemetry_semantic_conventions::attribute::HOST_IP,
host.ip.unwrap().to_string(),
),
],
opentelemetry_semantic_conventions::SCHEMA_URL,
)
.with_attribute(opentelemetry::KeyValue::new(
"host.idc",
host.idc.unwrap_or_default(),
))
.with_attribute(opentelemetry::KeyValue::new(
"host.location",
host.location.unwrap_or_default(),
))
.with_attribute(opentelemetry::KeyValue::new("host.seed_peer", is_seed_peer))
.build(),
)
.build();
let tracer = provider.tracer(name.to_string());
global::set_tracer_provider(provider.clone());
global::set_text_map_propagator(TraceContextPropagator::new());
let jaeger_layer = OpenTelemetryLayer::new(tracer);
subscriber.with(jaeger_layer).init();
} else {
subscriber.init();
}
std::panic::set_hook(Box::new(tracing_panic::panic_hook));
info!(
"tracing initialized directory: {}, level: {}",
log_dir.as_path().display(),
log_level
);
guards
}
#[allow(clippy::too_many_arguments)]
pub fn init_command_tracing(log_level: Level, console: bool) -> Vec<WorkerGuard> {
let mut guards = vec![];
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);
let stdout_filter = if console {
LevelFilter::DEBUG
} else {
LevelFilter::OFF
};
let stdout_logging_layer = Layer::new()
.with_writer(stdout_writer)
.with_file(true)
.with_line_number(true)
.with_target(false)
.with_thread_names(false)
.with_thread_ids(false)
.with_timer(ChronoLocal::rfc_3339())
.pretty()
.with_filter(stdout_filter);
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::default().add_directive(log_level.into()));
let console_subscriber_layer = if log_level == Level::TRACE {
Some(console_subscriber::spawn())
} else {
None
};
let subscriber = Registry::default()
.with(env_filter)
.with(console_subscriber_layer)
.with(stdout_logging_layer);
subscriber.init();
std::panic::set_hook(Box::new(tracing_panic::panic_hook));
info!("tracing initialized level: {}", log_level);
guards
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_init_tracing_comprehensive() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
let log_dir = temp_dir.path().join("logs");
assert!(!log_dir.exists());
let guards = init_tracing(
"test-service",
log_dir.clone(),
Level::INFO,
10,
None,
None,
None,
None,
None,
false,
false,
);
assert!(log_dir.exists());
assert!(log_dir.is_dir());
assert!(!guards.is_empty());
assert_eq!(guards.len(), 2);
}
}