use std::sync::LazyLock;
use http::Uri;
use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
use opentelemetry_zipkin::ZipkinExporter;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::BoxError;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::config::SamplerOption;
use crate::plugins::telemetry::endpoint::UriEndpoint;
use crate::plugins::telemetry::reload::tracing::TracingBuilder;
use crate::plugins::telemetry::reload::tracing::TracingConfigurator;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
use crate::plugins::telemetry::tracing::NamedSpanExporter;
use crate::plugins::telemetry::tracing::NamedTokioRuntime;
use crate::plugins::telemetry::tracing::SpanProcessorExt;
const OTEL_EXPORTER_ZIPKIN_ENDPOINT: &str = "OTEL_EXPORTER_ZIPKIN_ENDPOINT";
static DEFAULT_ENDPOINT: LazyLock<Uri> =
LazyLock::new(|| Uri::from_static("http://127.0.0.1:9411/api/v2/spans"));
#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(deny_unknown_fields)]
#[schemars(rename = "ZipkinConfig")]
pub(crate) struct Config {
pub(crate) enabled: bool,
#[serde(default)]
pub(crate) endpoint: UriEndpoint,
#[serde(default)]
pub(crate) batch_processor: BatchProcessorConfig,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) sampler: Option<SamplerOption>,
}
impl Config {
fn endpoint_with_env_override(&self) -> Result<Uri, BoxError> {
if let Ok(endpoint) = std::env::var(OTEL_EXPORTER_ZIPKIN_ENDPOINT) {
endpoint.parse::<Uri>().map_err(|e| {
format!(
"invalid URI in {}: '{}': {}",
OTEL_EXPORTER_ZIPKIN_ENDPOINT, endpoint, e
)
.into()
})
} else {
Ok(self.endpoint.to_full_uri(&DEFAULT_ENDPOINT))
}
}
}
impl TracingConfigurator for Config {
fn config(conf: &Conf) -> &Self {
&conf.exporters.tracing.zipkin
}
fn is_enabled(&self) -> bool {
self.enabled
}
fn configure(&self, builder: &mut TracingBuilder) -> Result<(), BoxError> {
tracing::info!("configuring Zipkin tracing: {}", self.batch_processor);
let endpoint = self.endpoint_with_env_override()?;
let exporter = ZipkinExporter::builder()
.with_collector_endpoint(endpoint.to_string())
.build()?;
let named_exporter = NamedSpanExporter::new(exporter, "zipkin");
let batch_span_processor =
BatchSpanProcessor::builder(named_exporter, NamedTokioRuntime::new("zipkin-tracing"))
.with_batch_config(self.batch_processor.clone().with_env_overrides()?.into())
.build()
.filtered();
if let Some(sampler) = &self.sampler {
let common = builder.tracing_common();
let sampled_batch_span_processor = batch_span_processor.with_sampler(
sampler,
common.parent_based_sampler,
&common.sampler,
);
builder.with_span_processor(sampled_batch_span_processor);
} else {
builder.with_span_processor(batch_span_processor);
}
Ok(())
}
}