use std::str::FromStr;
use opentelemetry::{KeyValue, trace::TracerProvider};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
Resource,
propagation::TraceContextPropagator,
trace::{RandomIdGenerator, Sampler},
};
use opentelemetry_semantic_conventions::resource::{
DEPLOYMENT_ENVIRONMENT_NAME, K8S_CLUSTER_NAME, K8S_NAMESPACE_NAME, K8S_POD_NAME, SERVICE_NAME,
};
use tracing_subscriber::{EnvFilter, Layer, registry::LookupSpan};
use url::Url;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ServiceDescriptor {
pub k8s_pod_name: String,
pub k8s_namespace_name: String,
pub k8s_cluster_name: String,
pub deployment_environment: String,
pub service_name: String,
}
impl Default for ServiceDescriptor {
fn default() -> Self {
Self::new()
}
}
impl ServiceDescriptor {
pub const K8S_POD_NAME_ENV_VAR: &'static str = "POD_NAME";
pub const K8S_NAMESPACE_NAME_ENV_VAR: &'static str = "POD_NAMESPACE";
pub const K8S_CLUSTER_NAME_ENV_VAR: &'static str = "CLUSTER_NAME";
pub const DEPLOYMENT_ENVIRONMENT_ENV_VAR: &'static str = "DEPLOYMENT_ENVIRONMENT";
pub const SERVICE_NAME_ENV_VAR: &'static str = "SERVICE_NAME";
pub const DEFAULT_K8S_POD_NAME: &'static str = "wire_framework-0";
pub const DEFAULT_K8S_NAMESPACE_NAME: &'static str = "local";
pub const DEFAULT_K8S_CLUSTER_NAME: &'static str = "local";
pub const DEFAULT_DEPLOYMENT_ENVIRONMENT: &'static str = "local";
pub const DEFAULT_SERVICE_NAME: &'static str = "wire_framework";
pub fn new() -> Self {
fn env_or(env_var: &str, default: &str) -> String {
std::env::var(env_var).unwrap_or_else(|_| default.to_string())
}
Self {
k8s_pod_name: env_or(Self::K8S_POD_NAME_ENV_VAR, Self::DEFAULT_K8S_POD_NAME),
k8s_namespace_name: env_or(
Self::K8S_NAMESPACE_NAME_ENV_VAR,
Self::DEFAULT_K8S_NAMESPACE_NAME,
),
k8s_cluster_name: env_or(
Self::K8S_CLUSTER_NAME_ENV_VAR,
Self::DEFAULT_K8S_CLUSTER_NAME,
),
deployment_environment: env_or(
Self::DEPLOYMENT_ENVIRONMENT_ENV_VAR,
Self::DEFAULT_DEPLOYMENT_ENVIRONMENT,
),
service_name: env_or(Self::SERVICE_NAME_ENV_VAR, Self::DEFAULT_SERVICE_NAME),
}
}
pub fn with_k8s_pod_name(mut self, k8s_pod_name: Option<String>) -> Self {
if let Some(k8s_pod_name) = k8s_pod_name {
self.k8s_pod_name = k8s_pod_name;
}
self
}
pub fn with_k8s_namespace_name(mut self, k8s_namespace_name: Option<String>) -> Self {
if let Some(k8s_namespace_name) = k8s_namespace_name {
self.k8s_namespace_name = k8s_namespace_name;
}
self
}
pub fn with_service_name(mut self, service_name: Option<String>) -> Self {
if let Some(service_name) = service_name {
self.service_name = service_name;
}
self
}
fn into_otlp_resource(self) -> Resource {
Resource::builder()
.with_attribute(KeyValue::new(K8S_POD_NAME, self.k8s_pod_name))
.with_attribute(KeyValue::new(K8S_NAMESPACE_NAME, self.k8s_namespace_name))
.with_attribute(KeyValue::new(K8S_CLUSTER_NAME, self.k8s_cluster_name))
.with_attribute(KeyValue::new(
DEPLOYMENT_ENVIRONMENT_NAME,
self.deployment_environment,
))
.with_attribute(KeyValue::new(SERVICE_NAME, self.service_name))
.build()
}
}
#[derive(Debug)]
pub struct OpenTelemetry {
pub opentelemetry_level: OpenTelemetryLevel,
pub tracing_endpoint: Option<Url>,
pub logging_endpoint: Option<Url>,
pub service: ServiceDescriptor,
}
impl OpenTelemetry {
pub fn new(
opentelemetry_level: &str,
tracing_endpoint: Option<String>,
logging_endpoint: Option<String>,
) -> Result<Self, OpenTelemetryLayerError> {
fn parse_url(url: Option<String>) -> Result<Option<Url>, OpenTelemetryLayerError> {
url.map(|v| {
v.parse()
.map_err(|e| OpenTelemetryLayerError::InvalidUrl(v, e))
})
.transpose()
}
Ok(Self {
opentelemetry_level: opentelemetry_level.parse()?,
tracing_endpoint: parse_url(tracing_endpoint)?,
logging_endpoint: parse_url(logging_endpoint)?,
service: ServiceDescriptor::new(),
})
}
pub fn with_service_descriptor(mut self, service: ServiceDescriptor) -> Self {
self.service = service;
self
}
pub(super) fn logs_layer<S>(
self,
) -> Option<(opentelemetry_sdk::logs::SdkLoggerProvider, impl Layer<S>)>
where
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
let logging_endpoint = self.logging_endpoint.clone()?;
let resource = self.service.clone().into_otlp_resource();
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_endpoint(logging_endpoint)
.build()
.expect("Failed to create OTLP exporter");
let provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(resource)
.build();
let layer =
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&provider);
Some((provider, layer))
}
pub(super) fn tracing_layer<S>(
&self,
) -> Option<(opentelemetry_sdk::trace::SdkTracerProvider, impl Layer<S>)>
where
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
let tracing_endpoint = self.tracing_endpoint.clone()?;
let filter = self
.filter()
.add_directive("otel::tracing=trace".parse().unwrap())
.add_directive("otel=debug".parse().unwrap());
let service_name = self.service.service_name.clone();
let resource = self.service.clone().into_otlp_resource();
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(tracing_endpoint)
.build()
.expect("Failed to create OTLP exporter");
let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_id_generator(RandomIdGenerator::default())
.with_sampler(Sampler::AlwaysOn)
.with_resource(resource)
.build();
let tracer = provider.tracer(service_name);
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
let layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter);
Some((provider, layer))
}
fn filter(&self) -> EnvFilter {
match self.opentelemetry_level {
OpenTelemetryLevel::OFF => EnvFilter::new("off"),
OpenTelemetryLevel::INFO => EnvFilter::new("info"),
OpenTelemetryLevel::DEBUG => EnvFilter::new("debug"),
OpenTelemetryLevel::TRACE => EnvFilter::new("trace"),
}
}
}
#[derive(Copy, Clone, Debug, Default)]
pub enum OpenTelemetryLevel {
#[default]
OFF,
INFO,
DEBUG,
TRACE,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum OpenTelemetryLayerError {
#[error("Invalid OpenTelemetry level format")]
InvalidFormat,
#[error("Invalid URL: \"{0}\" - {1}")]
InvalidUrl(String, url::ParseError),
}
impl FromStr for OpenTelemetryLevel {
type Err = OpenTelemetryLayerError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"off" => Ok(OpenTelemetryLevel::OFF),
"info" => Ok(OpenTelemetryLevel::INFO),
"debug" => Ok(OpenTelemetryLevel::DEBUG),
"trace" => Ok(OpenTelemetryLevel::TRACE),
_ => Err(OpenTelemetryLayerError::InvalidFormat),
}
}
}