use super::provider;
use async_trait::async_trait;
use humantime::parse_duration;
use opentelemetry::{
global::{self, BoxedTracer},
propagation::{TextMapCompositePropagator, TextMapPropagator},
trace::TracerProvider,
};
use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
use opentelemetry_sdk::{
Resource,
propagation::{BaggagePropagator, TraceContextPropagator},
trace::{BatchConfigBuilder, RandomIdGenerator, Sampler},
};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use std::time::Duration;
use tracing::{error, info};
use url::Url;
const LOG_TARGET: &str = "pingap::otel";
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3);
const DEFAULT_MAX_ATTRIBUTES: u32 = 16;
const DEFAULT_MAX_EVENTS: u32 = 16;
const DEFAULT_MAX_QUEUE_SIZE: usize = 2048;
const DEFAULT_SCHEDULED_DELAY: Duration = Duration::from_secs(5);
const DEFAULT_MAX_EXPORT_BATCH_SIZE: usize = 512;
const DEFAULT_MAX_EXPORT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, Clone)]
pub struct TracerConfig {
timeout: Duration,
max_attributes: u32,
max_events: u32,
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
max_export_timeout: Duration,
support_jaeger_propagator: bool,
support_baggage_propagator: bool,
compression: Option<Compression>,
}
impl Default for TracerConfig {
fn default() -> Self {
Self {
timeout: DEFAULT_TIMEOUT,
max_attributes: DEFAULT_MAX_ATTRIBUTES,
max_events: DEFAULT_MAX_EVENTS,
max_queue_size: DEFAULT_MAX_QUEUE_SIZE,
scheduled_delay: DEFAULT_SCHEDULED_DELAY,
max_export_batch_size: DEFAULT_MAX_EXPORT_BATCH_SIZE,
max_export_timeout: DEFAULT_MAX_EXPORT_TIMEOUT,
support_jaeger_propagator: false,
support_baggage_propagator: false,
compression: None,
}
}
}
#[derive(Debug)]
pub struct TracerService {
name: String,
endpoint: String,
config: TracerConfig,
}
impl TracerService {
pub fn builder() -> TracerServiceBuilder {
TracerServiceBuilder::default()
}
pub fn new(name: &str, endpoint: &str) -> Self {
Self::builder().name(name).endpoint(endpoint).build()
}
}
#[derive(Default)]
pub struct TracerServiceBuilder {
name: Option<String>,
endpoint: Option<String>,
config: TracerConfig,
}
impl TracerServiceBuilder {
pub fn name(mut self, name: &str) -> Self {
self.name = Some(name.to_string());
self
}
pub fn endpoint(mut self, endpoint: &str) -> Self {
self.endpoint = Some(endpoint.to_string());
if let Ok(info) = Url::parse(endpoint) {
self.parse_query_params(&info);
}
self
}
fn parse_query_params(&mut self, url: &Url) {
for (key, value) in url.query_pairs() {
match key.as_ref() {
"timeout" => {
if let Ok(v) = parse_duration(&value) {
self.config.timeout = v;
}
},
"max_queue_size" => {
if let Ok(v) = value.parse::<usize>() {
self.config.max_queue_size = v;
}
},
"scheduled_delay" => {
if let Ok(v) = parse_duration(&value) {
self.config.scheduled_delay = v;
}
},
"max_export_batch_size" => {
if let Ok(v) = value.parse::<usize>() {
self.config.max_export_batch_size = v;
}
},
"max_export_timeout" => {
if let Ok(v) = parse_duration(&value) {
self.config.max_export_timeout = v;
}
},
"max_attributes" => {
if let Ok(v) = value.parse::<u32>() {
self.config.max_attributes = v;
}
},
"max_events" => {
if let Ok(v) = value.parse::<u32>() {
self.config.max_events = v;
}
},
"jaeger" => {
self.config.support_jaeger_propagator = true;
},
"baggage" => {
self.config.support_baggage_propagator = true;
},
"compression" => {
if value.to_lowercase() == "zstd" {
self.config.compression = Some(Compression::Zstd);
} else {
self.config.compression = Some(Compression::Gzip);
}
},
_ => {},
}
}
}
pub fn build(self) -> TracerService {
TracerService {
name: self.name.unwrap_or_else(|| "default".to_string()),
endpoint: self
.endpoint
.unwrap_or_else(|| "http://localhost:4317".to_string()),
config: self.config,
}
}
}
#[inline]
fn get_service_name(name: &str) -> String {
format!("pingap:{name}")
}
#[inline]
pub fn new_http_proxy_tracer(name: &str) -> Option<BoxedTracer> {
if let Some(provider) = provider::get_provider(name) {
return Some(provider.tracer("http_proxy"));
}
None
}
#[async_trait]
impl BackgroundService for TracerService {
async fn start(&self, mut shutdown: ShutdownWatch) {
let mut builder = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&self.endpoint)
.with_timeout(self.config.timeout);
if let Some(compression) = self.config.compression {
builder = builder.with_compression(compression);
}
let result = builder.build().map(|exporter| {
let batch =
opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(self.config.max_queue_size)
.with_scheduled_delay(self.config.scheduled_delay)
.with_max_export_batch_size(
self.config.max_export_batch_size,
)
.build(),
)
.build();
opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_span_processor(batch)
.with_sampler(Sampler::AlwaysOn)
.with_id_generator(RandomIdGenerator::default())
.with_max_attributes_per_span(self.config.max_attributes)
.with_max_events_per_span(self.config.max_events)
.with_resource(
Resource::builder()
.with_service_name(get_service_name(&self.name))
.build(),
)
.build()
});
match result {
Ok(tracer_provider) => {
let mut propagators: Vec<
Box<dyn TextMapPropagator + Send + Sync>,
> = vec![Box::new(TraceContextPropagator::new())];
if self.config.support_jaeger_propagator {
propagators.push(Box::new(
opentelemetry_jaeger_propagator::Propagator::new(),
));
}
if self.config.support_baggage_propagator {
propagators.push(Box::new(BaggagePropagator::new()));
}
global::set_text_map_propagator(
TextMapCompositePropagator::new(propagators),
);
provider::add_provider(&self.name, tracer_provider.clone());
info!(
target: LOG_TARGET,
name = self.name,
endpoint = self.endpoint,
support_jaeger_propagator =
self.config.support_jaeger_propagator,
support_baggage_propagator =
self.config.support_baggage_propagator,
"opentelemetry init success"
);
let _ = shutdown.changed().await;
if let Err(e) = tracer_provider.shutdown() {
error!(
target: LOG_TARGET,
name = self.name,
error = %e,
"opentelemetry shutdown fail"
);
} else {
info!(
target: LOG_TARGET,
name = self.name,
"opentelemetry shutdown success"
);
}
},
Err(e) => {
error!(
target: LOG_TARGET,
name = self.name,
error = %e,
"opentelemetry init fail"
);
},
}
}
}