1use super::provider;
16use async_trait::async_trait;
17use humantime::parse_duration;
18use opentelemetry::{
19 global::{self, BoxedTracer},
20 propagation::{TextMapCompositePropagator, TextMapPropagator},
21 trace::TracerProvider,
22};
23use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
24use opentelemetry_sdk::{
25 Resource,
26 propagation::{BaggagePropagator, TraceContextPropagator},
27 trace::{BatchConfigBuilder, RandomIdGenerator, Sampler},
28};
29
30use pingora::{server::ShutdownWatch, services::background::BackgroundService};
31use std::time::Duration;
32use tracing::{error, info};
33use url::Url;
34
35const LOG_TARGET: &str = "pingap::otel";
36const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3);
38const DEFAULT_MAX_ATTRIBUTES: u32 = 16;
39const DEFAULT_MAX_EVENTS: u32 = 16;
40const DEFAULT_MAX_QUEUE_SIZE: usize = 2048;
41const DEFAULT_SCHEDULED_DELAY: Duration = Duration::from_secs(5);
42const DEFAULT_MAX_EXPORT_BATCH_SIZE: usize = 512;
43const DEFAULT_MAX_EXPORT_TIMEOUT: Duration = Duration::from_secs(30);
44
45#[derive(Debug, Clone)]
47pub struct TracerConfig {
48 timeout: Duration,
50 max_attributes: u32,
52 max_events: u32,
54 max_queue_size: usize,
56 scheduled_delay: Duration,
58 max_export_batch_size: usize,
60 max_export_timeout: Duration,
62 support_jaeger_propagator: bool,
64 support_baggage_propagator: bool,
66 compression: Option<Compression>,
67}
68
69impl Default for TracerConfig {
70 fn default() -> Self {
71 Self {
72 timeout: DEFAULT_TIMEOUT,
73 max_attributes: DEFAULT_MAX_ATTRIBUTES,
74 max_events: DEFAULT_MAX_EVENTS,
75 max_queue_size: DEFAULT_MAX_QUEUE_SIZE,
76 scheduled_delay: DEFAULT_SCHEDULED_DELAY,
77 max_export_batch_size: DEFAULT_MAX_EXPORT_BATCH_SIZE,
78 max_export_timeout: DEFAULT_MAX_EXPORT_TIMEOUT,
79 support_jaeger_propagator: false,
80 support_baggage_propagator: false,
81 compression: None,
82 }
83 }
84}
85
86#[derive(Debug)]
96pub struct TracerService {
97 name: String,
98 endpoint: String,
99 config: TracerConfig,
100}
101
102impl TracerService {
103 pub fn builder() -> TracerServiceBuilder {
105 TracerServiceBuilder::default()
106 }
107
108 pub fn new(name: &str, endpoint: &str) -> Self {
110 Self::builder().name(name).endpoint(endpoint).build()
111 }
112}
113
114#[derive(Default)]
116pub struct TracerServiceBuilder {
117 name: Option<String>,
118 endpoint: Option<String>,
119 config: TracerConfig,
120}
121
122impl TracerServiceBuilder {
123 pub fn name(mut self, name: &str) -> Self {
128 self.name = Some(name.to_string());
129 self
130 }
131
132 pub fn endpoint(mut self, endpoint: &str) -> Self {
137 self.endpoint = Some(endpoint.to_string());
138 if let Ok(info) = Url::parse(endpoint) {
139 self.parse_query_params(&info);
140 }
141 self
142 }
143
144 fn parse_query_params(&mut self, url: &Url) {
149 for (key, value) in url.query_pairs() {
150 match key.as_ref() {
151 "timeout" => {
152 if let Ok(v) = parse_duration(&value) {
153 self.config.timeout = v;
154 }
155 },
156 "max_queue_size" => {
157 if let Ok(v) = value.parse::<usize>() {
158 self.config.max_queue_size = v;
159 }
160 },
161 "scheduled_delay" => {
162 if let Ok(v) = parse_duration(&value) {
163 self.config.scheduled_delay = v;
164 }
165 },
166 "max_export_batch_size" => {
167 if let Ok(v) = value.parse::<usize>() {
168 self.config.max_export_batch_size = v;
169 }
170 },
171 "max_export_timeout" => {
172 if let Ok(v) = parse_duration(&value) {
173 self.config.max_export_timeout = v;
174 }
175 },
176 "max_attributes" => {
177 if let Ok(v) = value.parse::<u32>() {
178 self.config.max_attributes = v;
179 }
180 },
181 "max_events" => {
182 if let Ok(v) = value.parse::<u32>() {
183 self.config.max_events = v;
184 }
185 },
186 "jaeger" => {
187 self.config.support_jaeger_propagator = true;
188 },
189 "baggage" => {
190 self.config.support_baggage_propagator = true;
191 },
192 "compression" => {
193 if value.to_lowercase() == "zstd" {
194 self.config.compression = Some(Compression::Zstd);
195 } else {
196 self.config.compression = Some(Compression::Gzip);
197 }
198 },
199 _ => {},
200 }
201 }
202 }
203
204 pub fn build(self) -> TracerService {
206 TracerService {
207 name: self.name.unwrap_or_else(|| "default".to_string()),
208 endpoint: self
209 .endpoint
210 .unwrap_or_else(|| "http://localhost:4317".to_string()),
211 config: self.config,
212 }
213 }
214}
215
216#[inline]
221fn get_service_name(name: &str) -> String {
222 format!("pingap:{name}")
223}
224
225#[inline]
233pub fn new_http_proxy_tracer(name: &str) -> Option<BoxedTracer> {
234 if let Some(provider) = provider::get_provider(name) {
235 return Some(provider.tracer("http_proxy"));
236 }
237 None
238}
239
240#[async_trait]
241impl BackgroundService for TracerService {
242 async fn start(&self, mut shutdown: ShutdownWatch) {
244 let mut builder = opentelemetry_otlp::SpanExporter::builder()
245 .with_tonic()
246 .with_endpoint(&self.endpoint)
247 .with_timeout(self.config.timeout);
248 if let Some(compression) = self.config.compression {
249 builder = builder.with_compression(compression);
250 }
251
252 let result = builder.build().map(|exporter| {
253 let batch =
254 opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
255 .with_batch_config(
256 BatchConfigBuilder::default()
257 .with_max_queue_size(self.config.max_queue_size)
258 .with_scheduled_delay(self.config.scheduled_delay)
259 .with_max_export_batch_size(
260 self.config.max_export_batch_size,
261 )
262 .build(),
266 )
267 .build();
268 opentelemetry_sdk::trace::SdkTracerProvider::builder()
269 .with_span_processor(batch)
270 .with_sampler(Sampler::AlwaysOn)
271 .with_id_generator(RandomIdGenerator::default())
272 .with_max_attributes_per_span(self.config.max_attributes)
273 .with_max_events_per_span(self.config.max_events)
274 .with_resource(
275 Resource::builder()
276 .with_service_name(get_service_name(&self.name))
277 .build(),
278 )
279 .build()
280 });
281
282 match result {
283 Ok(tracer_provider) => {
284 let mut propagators: Vec<
285 Box<dyn TextMapPropagator + Send + Sync>,
286 > = vec![Box::new(TraceContextPropagator::new())];
287 if self.config.support_jaeger_propagator {
288 propagators.push(Box::new(
289 opentelemetry_jaeger_propagator::Propagator::new(),
290 ));
291 }
292 if self.config.support_baggage_propagator {
293 propagators.push(Box::new(BaggagePropagator::new()));
294 }
295 global::set_text_map_propagator(
296 TextMapCompositePropagator::new(propagators),
297 );
298
299 provider::add_provider(&self.name, tracer_provider.clone());
301 info!(
302 target: LOG_TARGET,
303 name = self.name,
304 endpoint = self.endpoint,
305 support_jaeger_propagator =
306 self.config.support_jaeger_propagator,
307 support_baggage_propagator =
308 self.config.support_baggage_propagator,
309 "opentelemetry init success"
310 );
311
312 let _ = shutdown.changed().await;
313 if let Err(e) = tracer_provider.shutdown() {
314 error!(
315 target: LOG_TARGET,
316 name = self.name,
317 error = %e,
318 "opentelemetry shutdown fail"
319 );
320 } else {
321 info!(
322 target: LOG_TARGET,
323 name = self.name,
324 "opentelemetry shutdown success"
325 );
326 }
327 },
328 Err(e) => {
329 error!(
330 target: LOG_TARGET,
331 name = self.name,
332 error = %e,
333 "opentelemetry init fail"
334 );
335 },
336 }
337 }
338}