use crate::cli_state::journeys::APP_NAME;
use crate::logs::ockam_tonic_logs_client::OckamTonicLogsClient;
use crate::logs::ockam_tonic_traces_client::OckamTonicTracesClient;
use crate::logs::secure_client_service::SecureClientService;
use crate::logs::tracing_guard::TracingGuard;
use crate::logs::{
ExportingConfiguration, LoggingConfiguration, OckamLogExporter, OckamLogFormat,
OckamUserLogFormat, TelemetryEndpoint,
};
use crate::logs::{LogFormat, OckamSpanExporter};
use gethostname::gethostname;
use ockam_node::Context;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::export::trace::SpanExporter;
use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::{BatchConfig, BatchConfigBuilder, BatchSpanProcessor};
use opentelemetry_sdk::{logs, Resource};
use opentelemetry_semantic_conventions::attribute;
use std::io::{empty, stdout};
use tonic::codec::CompressionEncoding;
use tonic::metadata::*;
use tracing_appender::non_blocking::NonBlocking;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_core::Subscriber;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::filter::{filter_fn, FilterFn};
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::fmt::layer;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, layer::SubscriberExt, registry, Layer};
pub struct LoggingTracing;
impl LoggingTracing {
pub fn setup(
logging_configuration: &LoggingConfiguration,
exporting_configuration: &ExportingConfiguration,
app_name: &str,
node_name: Option<String>,
ctx: &Context,
) -> TracingGuard {
if exporting_configuration.is_enabled() && logging_configuration.is_enabled() {
Self::setup_with_exporters(
create_span_exporter(exporting_configuration, ctx),
create_log_exporter(exporting_configuration, ctx),
logging_configuration,
exporting_configuration,
app_name,
node_name,
)
} else if exporting_configuration.is_enabled() {
Self::setup_tracing_only(
create_span_exporter(exporting_configuration, ctx),
logging_configuration,
exporting_configuration,
app_name,
node_name,
)
} else {
Self::setup_local_logging_only(logging_configuration)
}
}
pub fn setup_with_exporters<
T: SpanExporter + Send + 'static,
L: LogExporter + Send + 'static,
>(
span_exporter: T,
log_exporter: L,
logging_configuration: &LoggingConfiguration,
exporting_configuration: &ExportingConfiguration,
app_name: &str,
node_name: Option<String>,
) -> TracingGuard {
let (logging_layer, logger_provider) =
create_opentelemetry_logging_layer(app_name, exporting_configuration, log_exporter);
let (tracing_layer, tracer_provider) = create_opentelemetry_tracing_layer(
app_name,
node_name,
exporting_configuration,
span_exporter,
);
let (appender, worker_guard) = create_opentelemetry_appender(logging_configuration);
let opentelemetry_layer =
fmt::Layer::new()
.with_writer(std::io::stderr)
.with_filter(filter_fn(|metadata| {
metadata.target().starts_with("opentelemetry")
}));
let non_opentelemetry_filter: FilterFn<fn(&tracing_core::metadata::Metadata<'_>) -> bool> =
filter_fn(|metadata| !metadata.target().starts_with("opentelemetry"));
let logging_layer = logging_layer.with_filter(non_opentelemetry_filter);
let layers = registry()
.with(logging_configuration.env_filter())
.with(tracing_error::ErrorLayer::default())
.with(tracing_layer)
.with(opentelemetry_layer)
.with(logging_layer);
let result = match logging_configuration.format() {
LogFormat::Pretty => layers.with(appender.pretty()).try_init(),
LogFormat::Json => layers.with(appender.json()).try_init(),
LogFormat::Default => layers
.with(appender.event_format(OckamLogFormat::new()))
.try_init(),
LogFormat::User => layers
.with(appender.event_format(OckamUserLogFormat::new()))
.try_init(),
};
result.expect("Failed to initialize tracing subscriber");
global::set_text_map_propagator(TraceContextPropagator::default());
TracingGuard::new(worker_guard, logger_provider, tracer_provider)
}
pub fn setup_local_logging_only(logging_configuration: &LoggingConfiguration) -> TracingGuard {
let (appender, worker_guard) = make_logging_appender(logging_configuration);
if logging_configuration.is_enabled() {
let layers = registry().with(logging_configuration.env_filter());
let result = match logging_configuration.format() {
LogFormat::Pretty => layers.with(appender.pretty()).try_init(),
LogFormat::Json => layers.with(appender.json()).try_init(),
LogFormat::Default => layers
.with(appender.event_format(OckamLogFormat::new()))
.try_init(),
LogFormat::User => layers
.with(appender.event_format(OckamUserLogFormat::new()))
.try_init(),
};
result.expect("Failed to initialize tracing subscriber");
};
TracingGuard::guard_only(worker_guard)
}
pub fn setup_tracing_only<T: SpanExporter + Send + 'static>(
span_exporter: T,
logging_configuration: &LoggingConfiguration,
exporting_configuration: &ExportingConfiguration,
app_name: &str,
node_name: Option<String>,
) -> TracingGuard {
let (tracing_layer, tracer_provider) = create_opentelemetry_tracing_layer(
app_name,
node_name,
exporting_configuration,
span_exporter,
);
let result = registry()
.with(logging_configuration.env_filter())
.with(tracing_error::ErrorLayer::default())
.with(tracing_layer)
.try_init();
result.expect("Failed to initialize tracing subscriber");
global::set_text_map_propagator(TraceContextPropagator::default());
TracingGuard::tracing_only(tracer_provider)
}
}
fn create_log_exporter(
exporting_configuration: &ExportingConfiguration,
ctx: &Context,
) -> opentelemetry_otlp::LogExporter {
let log_export_timeout = exporting_configuration.log_export_timeout();
match exporting_configuration.opentelemetry_endpoint() {
TelemetryEndpoint::SecureChannelEndpoint(client, forwarder_service_name) => {
opentelemetry_otlp::LogExporter::new(OckamTonicLogsClient::new(
SecureClientService::new(client, ctx, &forwarder_service_name),
get_otlp_headers(),
Some(CompressionEncoding::Gzip),
))
}
TelemetryEndpoint::HttpsEndpoint(url) => opentelemetry_otlp::LogExporter::new(
opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(url.clone())
.with_timeout(log_export_timeout)
.with_metadata(get_otlp_headers())
.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots())
.build()
.expect("failed to create the log exporter"),
),
}
}
fn create_span_exporter(
exporting_configuration: &ExportingConfiguration,
ctx: &Context,
) -> opentelemetry_otlp::SpanExporter {
let trace_export_timeout = exporting_configuration.span_export_timeout();
match exporting_configuration.opentelemetry_endpoint() {
TelemetryEndpoint::SecureChannelEndpoint(client, forwarder_service_name) => {
opentelemetry_otlp::SpanExporter::new(OckamTonicTracesClient::new(
SecureClientService::new(client, ctx, &forwarder_service_name),
get_otlp_headers(),
Some(CompressionEncoding::Gzip),
))
}
TelemetryEndpoint::HttpsEndpoint(url) => opentelemetry_otlp::SpanExporter::new(
opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(url.clone())
.with_timeout(trace_export_timeout)
.with_metadata(get_otlp_headers())
.with_compression(Compression::Gzip)
.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots())
.build()
.expect("failed to create the span exporter"),
),
}
}
fn create_opentelemetry_tracing_layer<
R: Subscriber + Send + 'static + for<'a> LookupSpan<'a>,
S: SpanExporter + Send + 'static,
>(
app_name: &str,
node_name: Option<String>,
exporting_configuration: &ExportingConfiguration,
span_exporter: S,
) -> (
OpenTelemetryLayer<R, opentelemetry_sdk::trace::Tracer>,
opentelemetry_sdk::trace::TracerProvider,
) {
let app = app_name.to_string();
let batch_config = BatchConfigBuilder::default()
.with_max_export_timeout(exporting_configuration.span_export_timeout())
.with_scheduled_delay(exporting_configuration.span_export_scheduled_delay())
.with_max_concurrent_exports(8)
.with_max_queue_size(exporting_configuration.span_export_queue_size() as usize)
.build();
let is_ockam_developer = exporting_configuration.is_ockam_developer();
let span_export_cutoff = exporting_configuration.span_export_cutoff();
let (tracer, tracer_provider) = create_tracer(
app,
batch_config,
OckamSpanExporter::new(
span_exporter,
node_name,
is_ockam_developer,
span_export_cutoff,
),
);
(
tracing_opentelemetry::layer().with_tracer(tracer),
tracer_provider,
)
}
fn create_opentelemetry_logging_layer<L: LogExporter + Send + 'static>(
app_name: &str,
exporting_configuration: &ExportingConfiguration,
log_exporter: L,
) -> (
OpenTelemetryTracingBridge<LoggerProvider, logs::Logger>,
LoggerProvider,
) {
let app = app_name.to_string();
let log_export_timeout = exporting_configuration.log_export_timeout();
let log_export_scheduled_delay = exporting_configuration.log_export_scheduled_delay();
let log_export_queue_size = exporting_configuration.log_export_queue_size();
let log_export_cutoff = exporting_configuration.log_export_cutoff();
let resource = make_resource(app);
let batch_config = logs::BatchConfigBuilder::default()
.with_max_export_timeout(log_export_timeout)
.with_scheduled_delay(log_export_scheduled_delay)
.with_max_queue_size(log_export_queue_size as usize)
.build();
let log_exporter = OckamLogExporter::new(log_exporter, log_export_cutoff);
let log_processor = BatchLogProcessor::builder(log_exporter, opentelemetry_sdk::runtime::Tokio)
.with_batch_config(batch_config)
.build();
let provider = LoggerProvider::builder()
.with_resource(resource)
.with_log_processor(log_processor)
.build();
let layer = OpenTelemetryTracingBridge::new(&provider);
(layer, provider)
}
fn create_opentelemetry_appender<S>(
logging_configuration: &LoggingConfiguration,
) -> (
fmt::Layer<S, DefaultFields, Format, NonBlocking>,
WorkerGuard,
)
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
if logging_configuration.is_enabled() {
make_logging_appender(logging_configuration)
} else {
let (appender, worker_guard) = tracing_appender::non_blocking(empty());
let appender = layer().with_writer(appender);
(appender, worker_guard)
}
}
fn make_logging_appender<S>(
logging_configuration: &LoggingConfiguration,
) -> (
fmt::Layer<S, DefaultFields, Format, NonBlocking>,
WorkerGuard,
)
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let layer = layer().with_ansi(logging_configuration.is_colored());
let (writer, guard) = match logging_configuration.log_dir() {
None => tracing_appender::non_blocking(stdout()),
Some(log_dir) => {
let r = RollingFileAppender::builder()
.rotation(Rotation::DAILY)
.max_log_files(logging_configuration.max_files() as usize)
.filename_prefix("stdout")
.filename_suffix("log")
.build(log_dir)
.expect("Failed to create rolling file appender");
tracing_appender::non_blocking(r)
}
};
(layer.with_writer(writer), guard)
}
fn create_tracer<S: SpanExporter + 'static>(
app_name: String,
batch_config: BatchConfig,
exporter: S,
) -> (
opentelemetry_sdk::trace::Tracer,
opentelemetry_sdk::trace::TracerProvider,
) {
let span_processor = BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
.with_batch_config(batch_config)
.build();
let provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_span_processor(span_processor)
.with_resource(make_resource(app_name))
.build();
let tracer = provider.tracer("ockam");
let _ = global::set_tracer_provider(provider.clone());
(tracer, provider)
}
fn make_resource(app_name: String) -> Resource {
let host_name = gethostname().to_string_lossy().to_string();
Resource::new(vec![
KeyValue::new(attribute::SERVICE_NAME, "ockam"),
KeyValue::new(attribute::HOST_NAME, host_name),
KeyValue::new(APP_NAME.clone(), app_name),
])
}
fn get_otlp_headers() -> MetadataMap {
match std::env::var("OCKAM_OPENTELEMETRY_HEADERS") {
Ok(headers) => {
match headers.split_once('=') {
Some((key, value)) => {
match (
MetadataKey::from_bytes(key.as_bytes()),
MetadataValue::try_from(value.to_string()),
) {
(Ok(key), Ok(value)) => {
let mut map = MetadataMap::with_capacity(1);
map.insert(key, value);
map
}
_ => MetadataMap::default(),
}
}
_ => MetadataMap::default(),
}
}
_ => MetadataMap::default(),
}
}