Skip to main content

pingap_otel/
tracer.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::provider;
16use async_trait::async_trait;
17use humantime::parse_duration;
18use opentelemetry::{
19    global::{self, BoxedTracer},
20    propagation::{TextMapCompositePropagator, TextMapPropagator},
21    trace::TracerProvider,
22};
23use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
24use opentelemetry_sdk::{
25    Resource,
26    propagation::{BaggagePropagator, TraceContextPropagator},
27    trace::{BatchConfigBuilder, RandomIdGenerator, Sampler},
28};
29
30use pingora::{server::ShutdownWatch, services::background::BackgroundService};
31use std::time::Duration;
32use tracing::{error, info};
33use url::Url;
34
35const LOG_TARGET: &str = "pingap::otel";
36/// Default configuration values
37const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3);
38const DEFAULT_MAX_ATTRIBUTES: u32 = 16;
39const DEFAULT_MAX_EVENTS: u32 = 16;
40const DEFAULT_MAX_QUEUE_SIZE: usize = 2048;
41const DEFAULT_SCHEDULED_DELAY: Duration = Duration::from_secs(5);
42const DEFAULT_MAX_EXPORT_BATCH_SIZE: usize = 512;
43const DEFAULT_MAX_EXPORT_TIMEOUT: Duration = Duration::from_secs(30);
44
45/// Configuration for the tracer service
46#[derive(Debug, Clone)]
47pub struct TracerConfig {
48    /// Timeout duration for exporting spans
49    timeout: Duration,
50    /// Maximum number of attributes allowed per span
51    max_attributes: u32,
52    /// Maximum number of events allowed per span
53    max_events: u32,
54    /// Maximum size of the span queue before dropping
55    max_queue_size: usize,
56    /// Delay between scheduled exports of spans
57    scheduled_delay: Duration,
58    /// Maximum number of spans to export in a single batch
59    max_export_batch_size: usize,
60    /// Maximum timeout duration for exporting a batch
61    max_export_timeout: Duration,
62    /// Enable Jaeger propagation format support
63    support_jaeger_propagator: bool,
64    /// Enable W3C Baggage propagation format support
65    support_baggage_propagator: bool,
66    compression: Option<Compression>,
67}
68
69impl Default for TracerConfig {
70    fn default() -> Self {
71        Self {
72            timeout: DEFAULT_TIMEOUT,
73            max_attributes: DEFAULT_MAX_ATTRIBUTES,
74            max_events: DEFAULT_MAX_EVENTS,
75            max_queue_size: DEFAULT_MAX_QUEUE_SIZE,
76            scheduled_delay: DEFAULT_SCHEDULED_DELAY,
77            max_export_batch_size: DEFAULT_MAX_EXPORT_BATCH_SIZE,
78            max_export_timeout: DEFAULT_MAX_EXPORT_TIMEOUT,
79            support_jaeger_propagator: false,
80            support_baggage_propagator: false,
81            compression: None,
82        }
83    }
84}
85
86/// Service for managing OpenTelemetry tracing
87///
88/// This service handles the configuration and lifecycle of OpenTelemetry tracing,
89/// including span export to a collector endpoint.
90///
91/// # Fields
92/// * `name` - The service name used for identifying traces
93/// * `endpoint` - The OpenTelemetry collector endpoint URL
94/// * `config` - Configuration options for the tracer
95#[derive(Debug)]
96pub struct TracerService {
97    name: String,
98    endpoint: String,
99    config: TracerConfig,
100}
101
102impl TracerService {
103    /// Creates a new TracerService builder
104    pub fn builder() -> TracerServiceBuilder {
105        TracerServiceBuilder::default()
106    }
107
108    /// Creates a new TracerService with default configuration
109    pub fn new(name: &str, endpoint: &str) -> Self {
110        Self::builder().name(name).endpoint(endpoint).build()
111    }
112}
113
114/// Builder for TracerService
115#[derive(Default)]
116pub struct TracerServiceBuilder {
117    name: Option<String>,
118    endpoint: Option<String>,
119    config: TracerConfig,
120}
121
122impl TracerServiceBuilder {
123    /// Sets the service name for the tracer
124    ///
125    /// # Arguments
126    /// * `name` - The name of the service
127    pub fn name(mut self, name: &str) -> Self {
128        self.name = Some(name.to_string());
129        self
130    }
131
132    /// Sets the endpoint URL for the tracer and parses any configuration from query parameters
133    ///
134    /// # Arguments
135    /// * `endpoint` - The endpoint URL string
136    pub fn endpoint(mut self, endpoint: &str) -> Self {
137        self.endpoint = Some(endpoint.to_string());
138        if let Ok(info) = Url::parse(endpoint) {
139            self.parse_query_params(&info);
140        }
141        self
142    }
143
144    /// Parses configuration options from URL query parameters
145    ///
146    /// # Arguments
147    /// * `url` - The parsed URL containing query parameters
148    fn parse_query_params(&mut self, url: &Url) {
149        for (key, value) in url.query_pairs() {
150            match key.as_ref() {
151                "timeout" => {
152                    if let Ok(v) = parse_duration(&value) {
153                        self.config.timeout = v;
154                    }
155                },
156                "max_queue_size" => {
157                    if let Ok(v) = value.parse::<usize>() {
158                        self.config.max_queue_size = v;
159                    }
160                },
161                "scheduled_delay" => {
162                    if let Ok(v) = parse_duration(&value) {
163                        self.config.scheduled_delay = v;
164                    }
165                },
166                "max_export_batch_size" => {
167                    if let Ok(v) = value.parse::<usize>() {
168                        self.config.max_export_batch_size = v;
169                    }
170                },
171                "max_export_timeout" => {
172                    if let Ok(v) = parse_duration(&value) {
173                        self.config.max_export_timeout = v;
174                    }
175                },
176                "max_attributes" => {
177                    if let Ok(v) = value.parse::<u32>() {
178                        self.config.max_attributes = v;
179                    }
180                },
181                "max_events" => {
182                    if let Ok(v) = value.parse::<u32>() {
183                        self.config.max_events = v;
184                    }
185                },
186                "jaeger" => {
187                    self.config.support_jaeger_propagator = true;
188                },
189                "baggage" => {
190                    self.config.support_baggage_propagator = true;
191                },
192                "compression" => {
193                    if value.to_lowercase() == "zstd" {
194                        self.config.compression = Some(Compression::Zstd);
195                    } else {
196                        self.config.compression = Some(Compression::Gzip);
197                    }
198                },
199                _ => {},
200            }
201        }
202    }
203
204    /// Builds and returns a new TracerService with the configured options
205    pub fn build(self) -> TracerService {
206        TracerService {
207            name: self.name.unwrap_or_else(|| "default".to_string()),
208            endpoint: self
209                .endpoint
210                .unwrap_or_else(|| "http://localhost:4317".to_string()),
211            config: self.config,
212        }
213    }
214}
215
216/// Gets the full service name by adding the 'pingap:' prefix
217///
218/// # Arguments
219/// * `name` - Base service name
220#[inline]
221fn get_service_name(name: &str) -> String {
222    format!("pingap:{name}")
223}
224
225/// Creates a new BoxedTracer for the given service name
226///
227/// # Arguments
228/// * `name` - The service name to create a tracer for
229///
230/// # Returns
231/// * `Option<BoxedTracer>` - The created tracer if successful, None otherwise
232#[inline]
233pub fn new_http_proxy_tracer(name: &str) -> Option<BoxedTracer> {
234    if let Some(provider) = provider::get_provider(name) {
235        return Some(provider.tracer("http_proxy"));
236    }
237    None
238}
239
240#[async_trait]
241impl BackgroundService for TracerService {
242    /// Open telemetry background service, it will schedule export data to server.
243    async fn start(&self, mut shutdown: ShutdownWatch) {
244        let mut builder = opentelemetry_otlp::SpanExporter::builder()
245            .with_tonic()
246            .with_endpoint(&self.endpoint)
247            .with_timeout(self.config.timeout);
248        if let Some(compression) = self.config.compression {
249            builder = builder.with_compression(compression);
250        }
251
252        let result = builder.build().map(|exporter| {
253            let batch =
254                opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter)
255                    .with_batch_config(
256                        BatchConfigBuilder::default()
257                            .with_max_queue_size(self.config.max_queue_size)
258                            .with_scheduled_delay(self.config.scheduled_delay)
259                            .with_max_export_batch_size(
260                                self.config.max_export_batch_size,
261                            )
262                            // .with_max_export_timeout(
263                            //     self.config.max_export_timeout,
264                            // )
265                            .build(),
266                    )
267                    .build();
268            opentelemetry_sdk::trace::SdkTracerProvider::builder()
269                .with_span_processor(batch)
270                .with_sampler(Sampler::AlwaysOn)
271                .with_id_generator(RandomIdGenerator::default())
272                .with_max_attributes_per_span(self.config.max_attributes)
273                .with_max_events_per_span(self.config.max_events)
274                .with_resource(
275                    Resource::builder()
276                        .with_service_name(get_service_name(&self.name))
277                        .build(),
278                )
279                .build()
280        });
281
282        match result {
283            Ok(tracer_provider) => {
284                let mut propagators: Vec<
285                    Box<dyn TextMapPropagator + Send + Sync>,
286                > = vec![Box::new(TraceContextPropagator::new())];
287                if self.config.support_jaeger_propagator {
288                    propagators.push(Box::new(
289                        opentelemetry_jaeger_propagator::Propagator::new(),
290                    ));
291                }
292                if self.config.support_baggage_propagator {
293                    propagators.push(Box::new(BaggagePropagator::new()));
294                }
295                global::set_text_map_propagator(
296                    TextMapCompositePropagator::new(propagators),
297                );
298
299                // set tracer provider
300                provider::add_provider(&self.name, tracer_provider.clone());
301                info!(
302                    target: LOG_TARGET,
303                    name = self.name,
304                    endpoint = self.endpoint,
305                    support_jaeger_propagator =
306                        self.config.support_jaeger_propagator,
307                    support_baggage_propagator =
308                        self.config.support_baggage_propagator,
309                    "opentelemetry init success"
310                );
311
312                let _ = shutdown.changed().await;
313                if let Err(e) = tracer_provider.shutdown() {
314                    error!(
315                        target: LOG_TARGET,
316                        name = self.name,
317                        error = %e,
318                        "opentelemetry shutdown fail"
319                    );
320                } else {
321                    info!(
322                        target: LOG_TARGET,
323                        name = self.name,
324                        "opentelemetry shutdown success"
325                    );
326                }
327            },
328            Err(e) => {
329                error!(
330                    target: LOG_TARGET,
331                    name = self.name,
332                    error = %e,
333                    "opentelemetry init fail"
334                );
335            },
336        }
337    }
338}