opentelemetry_otlp/exporter/http/
mod.rs

1use super::{
2    default_headers, default_protocol, parse_header_string, resolve_timeout, ExporterBuildError,
3    OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
4};
5use crate::{ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
6use http::{HeaderName, HeaderValue, Uri};
7use opentelemetry::otel_debug;
8use opentelemetry_http::HttpClient;
9use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
10#[cfg(feature = "logs")]
11use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
12#[cfg(feature = "trace")]
13use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
14#[cfg(feature = "logs")]
15use opentelemetry_sdk::logs::LogBatch;
16#[cfg(feature = "trace")]
17use opentelemetry_sdk::trace::SpanData;
18use prost::Message;
19use std::collections::HashMap;
20use std::env;
21use std::str::FromStr;
22use std::sync::{Arc, Mutex};
23use std::time::Duration;
24
25#[cfg(feature = "metrics")]
26mod metrics;
27
28#[cfg(feature = "metrics")]
29use opentelemetry_sdk::metrics::data::ResourceMetrics;
30
31#[cfg(feature = "logs")]
32pub(crate) mod logs;
33
34#[cfg(feature = "trace")]
35mod trace;
36
37#[cfg(all(
38    not(feature = "reqwest-client"),
39    not(feature = "reqwest-blocking-client"),
40    feature = "hyper-client"
41))]
42use opentelemetry_http::hyper::HyperClient;
43
44/// Configuration of the http transport
45#[derive(Debug, Default)]
46pub struct HttpConfig {
47    /// Select the HTTP client
48    client: Option<Arc<dyn HttpClient>>,
49
50    /// Additional headers to send to the OTLP endpoint.
51    headers: Option<HashMap<String, String>>,
52
53    /// The compression algorithm to use when communicating with the OTLP endpoint.
54    compression: Option<crate::Compression>,
55}
56
57/// Configuration for the OTLP HTTP exporter.
58///
59/// ## Examples
60///
61/// ```no_run
62/// # #[cfg(feature="metrics")]
63/// use opentelemetry_sdk::metrics::Temporality;
64///
65/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
66/// // Create a span exporter you can use when configuring tracer providers
67/// # #[cfg(feature="trace")]
68/// let span_exporter = opentelemetry_otlp::SpanExporter::builder().with_http().build()?;
69///
70/// // Create a metrics exporter you can use when configuring meter providers
71/// # #[cfg(feature="metrics")]
72/// let metrics_exporter = opentelemetry_otlp::MetricExporter::builder()
73///     .with_http()
74///     .with_temporality(Temporality::default())
75///     .build()?;
76///
77/// // Create a log exporter you can use when configuring logger providers
78/// # #[cfg(feature="logs")]
79/// let log_exporter = opentelemetry_otlp::LogExporter::builder().with_http().build()?;
80/// # Ok(())
81/// # }
82/// ```
83///
84#[derive(Debug)]
85pub struct HttpExporterBuilder {
86    pub(crate) exporter_config: ExportConfig,
87    pub(crate) http_config: HttpConfig,
88}
89
90impl Default for HttpExporterBuilder {
91    fn default() -> Self {
92        HttpExporterBuilder {
93            exporter_config: ExportConfig {
94                protocol: default_protocol(),
95                ..ExportConfig::default()
96            },
97            http_config: HttpConfig {
98                headers: Some(default_headers()),
99                ..HttpConfig::default()
100            },
101        }
102    }
103}
104
105impl HttpExporterBuilder {
106    fn build_client(
107        &mut self,
108        signal_endpoint_var: &str,
109        signal_endpoint_path: &str,
110        signal_timeout_var: &str,
111        signal_http_headers_var: &str,
112        signal_compression_var: &str,
113    ) -> Result<OtlpHttpClient, ExporterBuildError> {
114        let endpoint = resolve_http_endpoint(
115            signal_endpoint_var,
116            signal_endpoint_path,
117            self.exporter_config.endpoint.as_deref(),
118        )?;
119
120        let compression = self.resolve_compression(signal_compression_var)?;
121
122        // Validate compression is supported at build time
123        if let Some(compression_alg) = &compression {
124            match compression_alg {
125                crate::Compression::Gzip => {
126                    #[cfg(not(feature = "gzip-http"))]
127                    {
128                        return Err(ExporterBuildError::UnsupportedCompressionAlgorithm(
129                            "gzip compression requested but gzip-http feature not enabled"
130                                .to_string(),
131                        ));
132                    }
133                }
134                crate::Compression::Zstd => {
135                    #[cfg(not(feature = "zstd-http"))]
136                    {
137                        return Err(ExporterBuildError::UnsupportedCompressionAlgorithm(
138                            "zstd compression requested but zstd-http feature not enabled"
139                                .to_string(),
140                        ));
141                    }
142                }
143            }
144        }
145
146        let timeout = resolve_timeout(signal_timeout_var, self.exporter_config.timeout.as_ref());
147
148        #[allow(unused_mut)] // TODO - clippy thinks mut is not needed, but it is
149        let mut http_client = self.http_config.client.take();
150
151        if http_client.is_none() {
152            #[cfg(all(
153                not(feature = "reqwest-client"),
154                not(feature = "reqwest-blocking-client"),
155                feature = "hyper-client"
156            ))]
157            {
158                // TODO - support configuring custom connector and executor
159                http_client = Some(Arc::new(HyperClient::with_default_connector(timeout, None))
160                    as Arc<dyn HttpClient>);
161            }
162            #[cfg(all(
163                not(feature = "hyper-client"),
164                not(feature = "reqwest-blocking-client"),
165                feature = "reqwest-client"
166            ))]
167            {
168                http_client = Some(Arc::new(
169                    reqwest::Client::builder()
170                        .timeout(timeout)
171                        .build()
172                        .unwrap_or_default(),
173                ) as Arc<dyn HttpClient>);
174            }
175            #[cfg(all(
176                not(feature = "hyper-client"),
177                not(feature = "reqwest-client"),
178                feature = "reqwest-blocking-client"
179            ))]
180            {
181                let timeout_clone = timeout;
182                http_client = Some(Arc::new(
183                    std::thread::spawn(move || {
184                        reqwest::blocking::Client::builder()
185                            .timeout(timeout_clone)
186                            .build()
187                            .unwrap_or_else(|_| reqwest::blocking::Client::new())
188                    })
189                    .join()
190                    .unwrap(), // TODO: Return ExporterBuildError::ThreadSpawnFailed
191                ) as Arc<dyn HttpClient>);
192            }
193        }
194
195        let http_client = http_client.ok_or(ExporterBuildError::NoHttpClient)?;
196
197        #[allow(clippy::mutable_key_type)] // http headers are not mutated
198        let mut headers: HashMap<HeaderName, HeaderValue> = self
199            .http_config
200            .headers
201            .take()
202            .unwrap_or_default()
203            .into_iter()
204            .filter_map(|(k, v)| {
205                Some((
206                    HeaderName::from_str(&k).ok()?,
207                    HeaderValue::from_str(&v).ok()?,
208                ))
209            })
210            .collect();
211
212        // read headers from env var - signal specific env var is preferred over general
213        if let Ok(input) =
214            env::var(signal_http_headers_var).or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
215        {
216            add_header_from_string(&input, &mut headers);
217        }
218
219        Ok(OtlpHttpClient::new(
220            http_client,
221            endpoint,
222            headers,
223            self.exporter_config.protocol,
224            timeout,
225            compression,
226        ))
227    }
228
229    fn resolve_compression(
230        &self,
231        env_override: &str,
232    ) -> Result<Option<crate::Compression>, super::ExporterBuildError> {
233        super::resolve_compression_from_env(self.http_config.compression, env_override)
234    }
235
236    /// Create a log exporter with the current configuration
237    #[cfg(feature = "trace")]
238    pub fn build_span_exporter(mut self) -> Result<crate::SpanExporter, ExporterBuildError> {
239        use crate::{
240            OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
241            OTEL_EXPORTER_OTLP_TRACES_HEADERS, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
242        };
243
244        let client = self.build_client(
245            OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
246            "/v1/traces",
247            OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
248            OTEL_EXPORTER_OTLP_TRACES_HEADERS,
249            OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
250        )?;
251
252        Ok(crate::SpanExporter::from_http(client))
253    }
254
255    /// Create a log exporter with the current configuration
256    #[cfg(feature = "logs")]
257    pub fn build_log_exporter(mut self) -> Result<crate::LogExporter, ExporterBuildError> {
258        use crate::{
259            OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
260            OTEL_EXPORTER_OTLP_LOGS_HEADERS, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
261        };
262
263        let client = self.build_client(
264            OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
265            "/v1/logs",
266            OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
267            OTEL_EXPORTER_OTLP_LOGS_HEADERS,
268            OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
269        )?;
270
271        Ok(crate::LogExporter::from_http(client))
272    }
273
274    /// Create a metrics exporter with the current configuration
275    #[cfg(feature = "metrics")]
276    pub fn build_metrics_exporter(
277        mut self,
278        temporality: opentelemetry_sdk::metrics::Temporality,
279    ) -> Result<crate::MetricExporter, ExporterBuildError> {
280        use crate::{
281            OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
282            OTEL_EXPORTER_OTLP_METRICS_HEADERS, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
283        };
284
285        let client = self.build_client(
286            OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
287            "/v1/metrics",
288            OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
289            OTEL_EXPORTER_OTLP_METRICS_HEADERS,
290            OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
291        )?;
292
293        Ok(crate::MetricExporter::from_http(client, temporality))
294    }
295}
296
297#[derive(Debug)]
298pub(crate) struct OtlpHttpClient {
299    client: Mutex<Option<Arc<dyn HttpClient>>>,
300    collector_endpoint: Uri,
301    headers: HashMap<HeaderName, HeaderValue>,
302    protocol: Protocol,
303    _timeout: Duration,
304    compression: Option<crate::Compression>,
305    #[allow(dead_code)]
306    // <allow dead> would be removed once we support set_resource for metrics and traces.
307    resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
308}
309
310impl OtlpHttpClient {
311    /// Compress data using gzip or zstd if the user has requested it and the relevant feature
312    /// has been enabled. If the user has requested it but the feature has not been enabled,
313    /// we should catch this at exporter build time and never get here.
314    fn process_body(&self, body: Vec<u8>) -> Result<(Vec<u8>, Option<&'static str>), String> {
315        match self.compression {
316            #[cfg(feature = "gzip-http")]
317            Some(crate::Compression::Gzip) => {
318                use flate2::{write::GzEncoder, Compression};
319                use std::io::Write;
320
321                let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
322                encoder.write_all(&body).map_err(|e| e.to_string())?;
323                let compressed = encoder.finish().map_err(|e| e.to_string())?;
324                Ok((compressed, Some("gzip")))
325            }
326            #[cfg(not(feature = "gzip-http"))]
327            Some(crate::Compression::Gzip) => {
328                Err("gzip compression requested but gzip-http feature not enabled".to_string())
329            }
330            #[cfg(feature = "zstd-http")]
331            Some(crate::Compression::Zstd) => {
332                let compressed = zstd::bulk::compress(&body, 0).map_err(|e| e.to_string())?;
333                Ok((compressed, Some("zstd")))
334            }
335            #[cfg(not(feature = "zstd-http"))]
336            Some(crate::Compression::Zstd) => {
337                Err("zstd compression requested but zstd-http feature not enabled".to_string())
338            }
339            None => Ok((body, None)),
340        }
341    }
342
343    #[allow(clippy::mutable_key_type)] // http headers are not mutated
344    fn new(
345        client: Arc<dyn HttpClient>,
346        collector_endpoint: Uri,
347        headers: HashMap<HeaderName, HeaderValue>,
348        protocol: Protocol,
349        timeout: Duration,
350        compression: Option<crate::Compression>,
351    ) -> Self {
352        OtlpHttpClient {
353            client: Mutex::new(Some(client)),
354            collector_endpoint,
355            headers,
356            protocol,
357            _timeout: timeout,
358            compression,
359            resource: ResourceAttributesWithSchema::default(),
360        }
361    }
362
363    #[cfg(feature = "trace")]
364    fn build_trace_export_body(
365        &self,
366        spans: Vec<SpanData>,
367    ) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String> {
368        use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
369        let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource);
370
371        let req = ExportTraceServiceRequest { resource_spans };
372        let (body, content_type) = match self.protocol {
373            #[cfg(feature = "http-json")]
374            Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
375                Ok(json) => (json.into_bytes(), "application/json"),
376                Err(e) => return Err(e.to_string()),
377            },
378            _ => (req.encode_to_vec(), "application/x-protobuf"),
379        };
380
381        let (processed_body, content_encoding) = self.process_body(body)?;
382        Ok((processed_body, content_type, content_encoding))
383    }
384
385    #[cfg(feature = "logs")]
386    fn build_logs_export_body(
387        &self,
388        logs: LogBatch<'_>,
389    ) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String> {
390        use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
391        let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
392        let req = ExportLogsServiceRequest { resource_logs };
393
394        let (body, content_type) = match self.protocol {
395            #[cfg(feature = "http-json")]
396            Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
397                Ok(json) => (json.into_bytes(), "application/json"),
398                Err(e) => return Err(e.to_string()),
399            },
400            _ => (req.encode_to_vec(), "application/x-protobuf"),
401        };
402
403        let (processed_body, content_encoding) = self.process_body(body)?;
404        Ok((processed_body, content_type, content_encoding))
405    }
406
407    #[cfg(feature = "metrics")]
408    fn build_metrics_export_body(
409        &self,
410        metrics: &ResourceMetrics,
411    ) -> Option<(Vec<u8>, &'static str, Option<&'static str>)> {
412        use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
413
414        let req: ExportMetricsServiceRequest = metrics.into();
415
416        let (body, content_type) = match self.protocol {
417            #[cfg(feature = "http-json")]
418            Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
419                Ok(json) => (json.into_bytes(), "application/json"),
420                Err(e) => {
421                    otel_debug!(name: "JsonSerializationFaied", error = e.to_string());
422                    return None;
423                }
424            },
425            _ => (req.encode_to_vec(), "application/x-protobuf"),
426        };
427
428        match self.process_body(body) {
429            Ok((processed_body, content_encoding)) => {
430                Some((processed_body, content_type, content_encoding))
431            }
432            Err(e) => {
433                otel_debug!(name: "CompressionFailed", error = e);
434                None
435            }
436        }
437    }
438}
439
440fn build_endpoint_uri(endpoint: &str, path: &str) -> Result<Uri, ExporterBuildError> {
441    let path = if endpoint.ends_with('/') && path.starts_with('/') {
442        path.strip_prefix('/').unwrap()
443    } else {
444        path
445    };
446    let endpoint = format!("{endpoint}{path}");
447    endpoint.parse().map_err(|er: http::uri::InvalidUri| {
448        ExporterBuildError::InvalidUri(endpoint, er.to_string())
449    })
450}
451
452// see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md#endpoint-urls-for-otlphttp
453fn resolve_http_endpoint(
454    signal_endpoint_var: &str,
455    signal_endpoint_path: &str,
456    provided_endpoint: Option<&str>,
457) -> Result<Uri, ExporterBuildError> {
458    // programmatic configuration overrides any value set via environment variables
459    if let Some(provider_endpoint) = provided_endpoint.filter(|s| !s.is_empty()) {
460        provider_endpoint
461            .parse()
462            .map_err(|er: http::uri::InvalidUri| {
463                ExporterBuildError::InvalidUri(provider_endpoint.to_string(), er.to_string())
464            })
465    } else if let Some(endpoint) = env::var(signal_endpoint_var)
466        .ok()
467        .and_then(|s| s.parse().ok())
468    {
469        // per signal env var is not modified
470        Ok(endpoint)
471    } else if let Some(endpoint) = env::var(OTEL_EXPORTER_OTLP_ENDPOINT)
472        .ok()
473        .and_then(|s| build_endpoint_uri(&s, signal_endpoint_path).ok())
474    {
475        // if signal env var is not set, then we check if the OTEL_EXPORTER_OTLP_ENDPOINT env var is set
476        Ok(endpoint)
477    } else {
478        build_endpoint_uri(
479            OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
480            signal_endpoint_path,
481        )
482    }
483}
484
485#[allow(clippy::mutable_key_type)] // http headers are not mutated
486fn add_header_from_string(input: &str, headers: &mut HashMap<HeaderName, HeaderValue>) {
487    headers.extend(parse_header_string(input).filter_map(|(key, value)| {
488        Some((
489            HeaderName::from_str(key).ok()?,
490            HeaderValue::from_str(&value).ok()?,
491        ))
492    }));
493}
494
495/// Expose interface for modifying builder config.
496pub trait HasHttpConfig {
497    /// Return a mutable reference to the config within the exporter builders.
498    fn http_client_config(&mut self) -> &mut HttpConfig;
499}
500
501/// Expose interface for modifying builder config.
502impl HasHttpConfig for HttpExporterBuilder {
503    fn http_client_config(&mut self) -> &mut HttpConfig {
504        &mut self.http_config
505    }
506}
507
508/// This trait will be implemented for every struct that implemented [`HasHttpConfig`] trait.
509///
510/// ## Examples
511/// ```
512/// # #[cfg(all(feature = "trace", feature = "grpc-tonic"))]
513/// # {
514/// use crate::opentelemetry_otlp::WithHttpConfig;
515/// let exporter_builder = opentelemetry_otlp::SpanExporter::builder()
516///     .with_http()
517///     .with_headers(std::collections::HashMap::new());
518/// # }
519/// ```
520pub trait WithHttpConfig {
521    /// Assign client implementation
522    fn with_http_client<T: HttpClient + 'static>(self, client: T) -> Self;
523
524    /// Set additional headers to send to the collector.
525    fn with_headers(self, headers: HashMap<String, String>) -> Self;
526
527    /// Set the compression algorithm to use when communicating with the collector.
528    fn with_compression(self, compression: crate::Compression) -> Self;
529}
530
531impl<B: HasHttpConfig> WithHttpConfig for B {
532    fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
533        self.http_client_config().client = Some(Arc::new(client));
534        self
535    }
536
537    fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
538        // headers will be wrapped, so we must do some logic to unwrap first.
539        let http_client_headers = self
540            .http_client_config()
541            .headers
542            .get_or_insert(HashMap::new());
543        headers.into_iter().for_each(|(key, value)| {
544            http_client_headers.insert(key, super::url_decode(&value).unwrap_or(value));
545        });
546        self
547    }
548
549    fn with_compression(mut self, compression: crate::Compression) -> Self {
550        self.http_client_config().compression = Some(compression);
551        self
552    }
553}
554
555#[cfg(test)]
556mod tests {
557    use crate::exporter::http::HttpConfig;
558    use crate::exporter::tests::run_env_test;
559    use crate::{
560        HttpExporterBuilder, WithExportConfig, WithHttpConfig, OTEL_EXPORTER_OTLP_ENDPOINT,
561        OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
562    };
563
564    use super::{build_endpoint_uri, resolve_http_endpoint};
565
566    #[test]
567    fn test_append_signal_path_to_generic_env() {
568        run_env_test(
569            vec![(OTEL_EXPORTER_OTLP_ENDPOINT, "http://example.com")],
570            || {
571                let endpoint =
572                    resolve_http_endpoint(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "/v1/traces", None)
573                        .unwrap();
574                assert_eq!(endpoint, "http://example.com/v1/traces");
575            },
576        )
577    }
578
579    #[test]
580    fn test_not_append_signal_path_to_signal_env() {
581        run_env_test(
582            vec![(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://example.com")],
583            || {
584                let endpoint =
585                    resolve_http_endpoint(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "/v1/traces", None)
586                        .unwrap();
587                assert_eq!(endpoint, "http://example.com");
588            },
589        )
590    }
591
592    #[test]
593    fn test_priority_of_signal_env_over_generic_env() {
594        run_env_test(
595            vec![
596                (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://example.com"),
597                (OTEL_EXPORTER_OTLP_ENDPOINT, "http://wrong.com"),
598            ],
599            || {
600                let endpoint = super::resolve_http_endpoint(
601                    OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
602                    "/v1/traces",
603                    None,
604                )
605                .unwrap();
606                assert_eq!(endpoint, "http://example.com");
607            },
608        );
609    }
610
611    #[test]
612    fn test_priority_of_code_based_config_over_envs() {
613        run_env_test(
614            vec![
615                (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://example.com"),
616                (OTEL_EXPORTER_OTLP_ENDPOINT, "http://wrong.com"),
617            ],
618            || {
619                let endpoint = super::resolve_http_endpoint(
620                    OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
621                    "/v1/traces",
622                    Some("http://localhost:4317"),
623                )
624                .unwrap();
625                assert_eq!(endpoint, "http://localhost:4317");
626            },
627        );
628    }
629
630    #[test]
631    fn test_use_default_when_empty_string_for_option() {
632        run_env_test(vec![], || {
633            let endpoint =
634                super::resolve_http_endpoint("non_existent_var", "/v1/traces", Some("")).unwrap();
635            assert_eq!(endpoint, "http://localhost:4318/v1/traces");
636        });
637    }
638
639    #[test]
640    fn test_use_default_when_others_missing() {
641        run_env_test(vec![], || {
642            let endpoint =
643                super::resolve_http_endpoint("NON_EXISTENT_VAR", "/v1/traces", None).unwrap();
644            assert_eq!(endpoint, "http://localhost:4318/v1/traces");
645        });
646    }
647
648    #[test]
649    fn test_build_endpoint_uri() {
650        let uri = build_endpoint_uri("https://example.com", "/v1/traces").unwrap();
651        assert_eq!(uri, "https://example.com/v1/traces");
652
653        // Should be no duplicate slahes:
654        let uri = build_endpoint_uri("https://example.com/", "/v1/traces").unwrap();
655        assert_eq!(uri, "https://example.com/v1/traces");
656
657        // Append paths properly:
658        let uri = build_endpoint_uri("https://example.com/additional/path/", "/v1/traces").unwrap();
659        assert_eq!(uri, "https://example.com/additional/path/v1/traces");
660    }
661
662    #[test]
663    fn test_invalid_uri_in_signal_env_falls_back_to_generic_env() {
664        run_env_test(
665            vec![
666                (
667                    OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
668                    "-*/*-/*-//-/-/invalid-uri",
669                ),
670                (OTEL_EXPORTER_OTLP_ENDPOINT, "http://example.com"),
671            ],
672            || {
673                let endpoint = super::resolve_http_endpoint(
674                    OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
675                    "/v1/traces",
676                    None,
677                )
678                .unwrap();
679                assert_eq!(endpoint, "http://example.com/v1/traces");
680            },
681        );
682    }
683
684    #[test]
685    fn test_all_invalid_urls_falls_back_to_error() {
686        run_env_test(vec![], || {
687            let result = super::resolve_http_endpoint(
688                OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
689                "/v1/traces",
690                Some("-*/*-/*-//-/-/yet-another-invalid-uri"),
691            );
692            assert!(result.is_err());
693            // You may also want to assert on the specific error type if applicable
694        });
695    }
696
697    #[test]
698    fn test_add_header_from_string() {
699        use http::{HeaderName, HeaderValue};
700        use std::collections::HashMap;
701        let test_cases = vec![
702            // Format: (input_str, expected_headers)
703            ("k1=v1", vec![("k1", "v1")]),
704            ("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]),
705            ("k1=v1=10,k2,k3", vec![("k1", "v1=10")]),
706            ("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]),
707        ];
708
709        for (input_str, expected_headers) in test_cases {
710            #[allow(clippy::mutable_key_type)] // http headers are not mutated
711            let mut headers: HashMap<HeaderName, HeaderValue> = HashMap::new();
712            super::add_header_from_string(input_str, &mut headers);
713
714            assert_eq!(
715                headers.len(),
716                expected_headers.len(),
717                "Failed on input: {input_str}"
718            );
719
720            for (expected_key, expected_value) in expected_headers {
721                assert_eq!(
722                    headers.get(&HeaderName::from_static(expected_key)),
723                    Some(&HeaderValue::from_static(expected_value)),
724                    "Failed on key: {expected_key} with input: {input_str}"
725                );
726            }
727        }
728    }
729
730    #[test]
731    fn test_merge_header_from_string() {
732        use http::{HeaderName, HeaderValue};
733        use std::collections::HashMap;
734        #[allow(clippy::mutable_key_type)] // http headers are not mutated
735        let mut headers: HashMap<HeaderName, HeaderValue> = std::collections::HashMap::new();
736        headers.insert(
737            HeaderName::from_static("k1"),
738            HeaderValue::from_static("v1"),
739        );
740        headers.insert(
741            HeaderName::from_static("k2"),
742            HeaderValue::from_static("v2"),
743        );
744        let test_cases = vec![
745            // Format: (input_str, expected_headers)
746            ("k1=v1_new", vec![("k1", "v1_new"), ("k2", "v2")]),
747            (
748                "k3=val=10,22,34,k4=,k5=10",
749                vec![
750                    ("k1", "v1_new"),
751                    ("k2", "v2"),
752                    ("k3", "val=10"),
753                    ("k5", "10"),
754                ],
755            ),
756        ];
757
758        for (input_str, expected_headers) in test_cases {
759            super::add_header_from_string(input_str, &mut headers);
760
761            assert_eq!(
762                headers.len(),
763                expected_headers.len(),
764                "Failed on input: {input_str}"
765            );
766
767            for (expected_key, expected_value) in expected_headers {
768                assert_eq!(
769                    headers.get(&HeaderName::from_static(expected_key)),
770                    Some(&HeaderValue::from_static(expected_value)),
771                    "Failed on key: {expected_key} with input: {input_str}"
772                );
773            }
774        }
775    }
776
777    #[test]
778    fn test_http_exporter_builder_with_headers() {
779        use std::collections::HashMap;
780        // Arrange
781        let initial_headers = HashMap::from([("k1".to_string(), "v1".to_string())]);
782        let extra_headers = HashMap::from([
783            ("k2".to_string(), "v2".to_string()),
784            ("k3".to_string(), "v3".to_string()),
785        ]);
786        let expected_headers = initial_headers.iter().chain(extra_headers.iter()).fold(
787            HashMap::new(),
788            |mut acc, (k, v)| {
789                acc.insert(k.clone(), v.clone());
790                acc
791            },
792        );
793        let builder = HttpExporterBuilder {
794            http_config: HttpConfig {
795                client: None,
796                headers: Some(initial_headers),
797                compression: None,
798            },
799            exporter_config: crate::ExportConfig::default(),
800        };
801
802        // Act
803        let builder = builder.with_headers(extra_headers);
804
805        // Assert
806        assert_eq!(
807            builder
808                .http_config
809                .headers
810                .clone()
811                .expect("headers should always be Some"),
812            expected_headers,
813        );
814    }
815
816    #[test]
817    fn test_http_exporter_endpoint() {
818        // default endpoint should add signal path
819        run_env_test(vec![], || {
820            let exporter = HttpExporterBuilder::default();
821
822            let url = resolve_http_endpoint(
823                OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
824                "/v1/traces",
825                exporter.exporter_config.endpoint.as_deref(),
826            )
827            .unwrap();
828
829            assert_eq!(url, "http://localhost:4318/v1/traces");
830        });
831
832        // if builder endpoint is set, it should not add signal path
833        run_env_test(vec![], || {
834            let exporter = HttpExporterBuilder::default()
835                .with_endpoint("http://localhost:4318/v1/tracesbutnotreally");
836
837            let url = resolve_http_endpoint(
838                OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
839                "/v1/traces",
840                exporter.exporter_config.endpoint.as_deref(),
841            )
842            .unwrap();
843
844            assert_eq!(url, "http://localhost:4318/v1/tracesbutnotreally");
845        });
846    }
847
848    #[cfg(feature = "gzip-http")]
849    mod compression_tests {
850        use super::super::OtlpHttpClient;
851        use flate2::read::GzDecoder;
852        use opentelemetry_http::{Bytes, HttpClient};
853        use std::io::Read;
854
855        #[test]
856        fn test_gzip_compression_and_decompression() {
857            let client = OtlpHttpClient::new(
858                std::sync::Arc::new(MockHttpClient),
859                "http://localhost:4318".parse().unwrap(),
860                std::collections::HashMap::new(),
861                crate::Protocol::HttpBinary,
862                std::time::Duration::from_secs(10),
863                Some(crate::Compression::Gzip),
864            );
865
866            // Test with some sample data
867            let test_data = b"Hello, world! This is test data for compression.";
868            let result = client.process_body(test_data.to_vec()).unwrap();
869            let (compressed_body, content_encoding) = result;
870
871            // Verify encoding header is set
872            assert_eq!(content_encoding, Some("gzip"));
873
874            // Verify we can decompress the body
875            let mut decoder = GzDecoder::new(&compressed_body[..]);
876            let mut decompressed = Vec::new();
877            decoder.read_to_end(&mut decompressed).unwrap();
878
879            // Verify decompressed data matches original
880            assert_eq!(decompressed, test_data);
881            // Verify compression actually happened (compressed should be different)
882            assert_ne!(compressed_body, test_data.to_vec());
883        }
884
885        #[cfg(feature = "zstd-http")]
886        #[test]
887        fn test_zstd_compression_and_decompression() {
888            let client = OtlpHttpClient::new(
889                std::sync::Arc::new(MockHttpClient),
890                "http://localhost:4318".parse().unwrap(),
891                std::collections::HashMap::new(),
892                crate::Protocol::HttpBinary,
893                std::time::Duration::from_secs(10),
894                Some(crate::Compression::Zstd),
895            );
896
897            // Test with some sample data
898            let test_data = b"Hello, world! This is test data for zstd compression.";
899            let result = client.process_body(test_data.to_vec()).unwrap();
900            let (compressed_body, content_encoding) = result;
901
902            // Verify encoding header is set
903            assert_eq!(content_encoding, Some("zstd"));
904
905            // Verify we can decompress the body
906            let decompressed = zstd::bulk::decompress(&compressed_body, test_data.len()).unwrap();
907
908            // Verify decompressed data matches original
909            assert_eq!(decompressed, test_data);
910            // Verify compression actually happened (compressed should be different)
911            assert_ne!(compressed_body, test_data.to_vec());
912        }
913
914        #[test]
915        fn test_no_compression_when_disabled() {
916            let client = OtlpHttpClient::new(
917                std::sync::Arc::new(MockHttpClient),
918                "http://localhost:4318".parse().unwrap(),
919                std::collections::HashMap::new(),
920                crate::Protocol::HttpBinary,
921                std::time::Duration::from_secs(10),
922                None, // No compression
923            );
924
925            let body = vec![1, 2, 3, 4];
926            let result = client.process_body(body.clone()).unwrap();
927            let (result_body, content_encoding) = result;
928
929            // Body should be unchanged and no encoding header
930            assert_eq!(result_body, body);
931            assert_eq!(content_encoding, None);
932        }
933
934        #[cfg(not(feature = "gzip-http"))]
935        #[test]
936        fn test_gzip_error_when_feature_disabled() {
937            let client = OtlpHttpClient::new(
938                std::sync::Arc::new(MockHttpClient),
939                "http://localhost:4318".parse().unwrap(),
940                std::collections::HashMap::new(),
941                crate::Protocol::HttpBinary,
942                std::time::Duration::from_secs(10),
943                Some(crate::Compression::Gzip),
944            );
945
946            let body = vec![1, 2, 3, 4];
947            let result = client.process_body(body);
948
949            // Should return error when gzip requested but feature not enabled
950            assert!(result.is_err());
951            assert!(result
952                .unwrap_err()
953                .contains("gzip-http feature not enabled"));
954        }
955
956        #[cfg(not(feature = "zstd-http"))]
957        #[test]
958        fn test_zstd_error_when_feature_disabled() {
959            let client = OtlpHttpClient::new(
960                std::sync::Arc::new(MockHttpClient),
961                "http://localhost:4318".parse().unwrap(),
962                std::collections::HashMap::new(),
963                crate::Protocol::HttpBinary,
964                std::time::Duration::from_secs(10),
965                Some(crate::Compression::Zstd),
966            );
967
968            let body = vec![1, 2, 3, 4];
969            let result = client.process_body(body);
970
971            // Should return error when zstd requested but feature not enabled
972            assert!(result.is_err());
973            assert!(result
974                .unwrap_err()
975                .contains("zstd-http feature not enabled"));
976        }
977
978        // Mock HTTP client for testing
979        #[derive(Debug)]
980        struct MockHttpClient;
981
982        #[async_trait::async_trait]
983        impl HttpClient for MockHttpClient {
984            async fn send_bytes(
985                &self,
986                _request: http::Request<Bytes>,
987            ) -> Result<http::Response<Bytes>, opentelemetry_http::HttpError> {
988                Ok(http::Response::builder()
989                    .status(200)
990                    .body(Bytes::new())
991                    .unwrap())
992            }
993        }
994    }
995
996    mod export_body_tests {
997        use super::super::OtlpHttpClient;
998        use opentelemetry_http::{Bytes, HttpClient};
999        use std::collections::HashMap;
1000
1001        #[derive(Debug)]
1002        struct MockHttpClient;
1003
1004        #[async_trait::async_trait]
1005        impl HttpClient for MockHttpClient {
1006            async fn send_bytes(
1007                &self,
1008                _request: http::Request<Bytes>,
1009            ) -> Result<http::Response<Bytes>, opentelemetry_http::HttpError> {
1010                Ok(http::Response::builder()
1011                    .status(200)
1012                    .body(Bytes::new())
1013                    .unwrap())
1014            }
1015        }
1016
1017        fn create_test_client(
1018            protocol: crate::Protocol,
1019            compression: Option<crate::Compression>,
1020        ) -> OtlpHttpClient {
1021            OtlpHttpClient::new(
1022                std::sync::Arc::new(MockHttpClient),
1023                "http://localhost:4318".parse().unwrap(),
1024                HashMap::new(),
1025                protocol,
1026                std::time::Duration::from_secs(10),
1027                compression,
1028            )
1029        }
1030
1031        fn create_test_span_data() -> opentelemetry_sdk::trace::SpanData {
1032            use opentelemetry::trace::Status;
1033            use opentelemetry::trace::{
1034                SpanContext, SpanId, SpanKind, TraceFlags, TraceId, TraceState,
1035            };
1036            use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
1037            use std::borrow::Cow;
1038            use std::time::{Duration, SystemTime};
1039
1040            let span_context = SpanContext::new(
1041                TraceId::from(123),
1042                SpanId::from(456),
1043                TraceFlags::default(),
1044                false,
1045                TraceState::default(),
1046            );
1047            SpanData {
1048                span_context,
1049                parent_span_id: SpanId::from(0),
1050                parent_span_is_remote: false,
1051                span_kind: SpanKind::Internal,
1052                name: Cow::Borrowed("test_span"),
1053                start_time: SystemTime::UNIX_EPOCH,
1054                end_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1),
1055                attributes: vec![],
1056                dropped_attributes_count: 0,
1057                events: SpanEvents::default(),
1058                links: SpanLinks::default(),
1059                status: Status::Unset,
1060                instrumentation_scope: opentelemetry::InstrumentationScope::default(),
1061            }
1062        }
1063
1064        #[cfg(feature = "trace")]
1065        #[test]
1066        fn test_build_trace_export_body_binary_protocol() {
1067            let client = create_test_client(crate::Protocol::HttpBinary, None);
1068            let span_data = create_test_span_data();
1069
1070            let result = client.build_trace_export_body(vec![span_data]).unwrap();
1071            let (_body, content_type, content_encoding) = result;
1072
1073            assert_eq!(content_type, "application/x-protobuf");
1074            assert_eq!(content_encoding, None);
1075        }
1076
1077        #[cfg(all(feature = "trace", feature = "http-json"))]
1078        #[test]
1079        fn test_build_trace_export_body_json_protocol() {
1080            let client = create_test_client(crate::Protocol::HttpJson, None);
1081            let span_data = create_test_span_data();
1082
1083            let result = client.build_trace_export_body(vec![span_data]).unwrap();
1084            let (_body, content_type, content_encoding) = result;
1085
1086            assert_eq!(content_type, "application/json");
1087            assert_eq!(content_encoding, None);
1088        }
1089
1090        #[cfg(all(feature = "trace", feature = "gzip-http"))]
1091        #[test]
1092        fn test_build_trace_export_body_with_compression() {
1093            let client =
1094                create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip));
1095            let span_data = create_test_span_data();
1096
1097            let result = client.build_trace_export_body(vec![span_data]).unwrap();
1098            let (_body, content_type, content_encoding) = result;
1099
1100            assert_eq!(content_type, "application/x-protobuf");
1101            assert_eq!(content_encoding, Some("gzip"));
1102        }
1103
1104        #[cfg(feature = "logs")]
1105        fn create_test_log_batch() -> opentelemetry_sdk::logs::LogBatch<'static> {
1106            use opentelemetry_sdk::logs::LogBatch;
1107
1108            // Use empty batch for simplicity - the method should still handle protocol/compression correctly
1109            LogBatch::new(&[])
1110        }
1111
1112        #[cfg(feature = "logs")]
1113        #[test]
1114        fn test_build_logs_export_body_binary_protocol() {
1115            let client = create_test_client(crate::Protocol::HttpBinary, None);
1116            let batch = create_test_log_batch();
1117
1118            let result = client.build_logs_export_body(batch).unwrap();
1119            let (_body, content_type, content_encoding) = result;
1120
1121            assert_eq!(content_type, "application/x-protobuf");
1122            assert_eq!(content_encoding, None);
1123        }
1124
1125        #[cfg(all(feature = "logs", feature = "http-json"))]
1126        #[test]
1127        fn test_build_logs_export_body_json_protocol() {
1128            let client = create_test_client(crate::Protocol::HttpJson, None);
1129            let batch = create_test_log_batch();
1130
1131            let result = client.build_logs_export_body(batch).unwrap();
1132            let (_body, content_type, content_encoding) = result;
1133
1134            assert_eq!(content_type, "application/json");
1135            assert_eq!(content_encoding, None);
1136        }
1137
1138        #[cfg(all(feature = "logs", feature = "gzip-http"))]
1139        #[test]
1140        fn test_build_logs_export_body_with_compression() {
1141            let client =
1142                create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip));
1143            let batch = create_test_log_batch();
1144
1145            let result = client.build_logs_export_body(batch).unwrap();
1146            let (_body, content_type, content_encoding) = result;
1147
1148            assert_eq!(content_type, "application/x-protobuf");
1149            assert_eq!(content_encoding, Some("gzip"));
1150        }
1151
1152        #[cfg(feature = "metrics")]
1153        #[test]
1154        fn test_build_metrics_export_body_binary_protocol() {
1155            use opentelemetry_sdk::metrics::data::ResourceMetrics;
1156
1157            let client = create_test_client(crate::Protocol::HttpBinary, None);
1158            let metrics = ResourceMetrics::default();
1159
1160            let result = client.build_metrics_export_body(&metrics).unwrap();
1161            let (_body, content_type, content_encoding) = result;
1162
1163            assert_eq!(content_type, "application/x-protobuf");
1164            assert_eq!(content_encoding, None);
1165        }
1166
1167        #[cfg(all(feature = "metrics", feature = "http-json"))]
1168        #[test]
1169        fn test_build_metrics_export_body_json_protocol() {
1170            use opentelemetry_sdk::metrics::data::ResourceMetrics;
1171
1172            let client = create_test_client(crate::Protocol::HttpJson, None);
1173            let metrics = ResourceMetrics::default();
1174
1175            let result = client.build_metrics_export_body(&metrics).unwrap();
1176            let (_body, content_type, content_encoding) = result;
1177
1178            assert_eq!(content_type, "application/json");
1179            assert_eq!(content_encoding, None);
1180        }
1181
1182        #[cfg(all(feature = "metrics", feature = "gzip-http"))]
1183        #[test]
1184        fn test_build_metrics_export_body_with_compression() {
1185            use opentelemetry_sdk::metrics::data::ResourceMetrics;
1186
1187            let client =
1188                create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip));
1189            let metrics = ResourceMetrics::default();
1190
1191            let result = client.build_metrics_export_body(&metrics).unwrap();
1192            let (_body, content_type, content_encoding) = result;
1193
1194            assert_eq!(content_type, "application/x-protobuf");
1195            assert_eq!(content_encoding, Some("gzip"));
1196        }
1197
1198        #[cfg(all(feature = "metrics", not(feature = "gzip-http")))]
1199        #[test]
1200        fn test_build_metrics_export_body_compression_error_returns_none() {
1201            use opentelemetry_sdk::metrics::data::ResourceMetrics;
1202
1203            let client =
1204                create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip));
1205            let metrics = ResourceMetrics::default();
1206
1207            // Should return None when compression fails (feature not enabled)
1208            let result = client.build_metrics_export_body(&metrics);
1209            assert!(result.is_none());
1210        }
1211
1212        #[test]
1213        fn test_resolve_compression_uses_generic_env_fallback() {
1214            use super::super::HttpExporterBuilder;
1215            use crate::exporter::tests::run_env_test;
1216
1217            // Test that generic OTEL_EXPORTER_OTLP_COMPRESSION is used when signal-specific env var is not set
1218            run_env_test(
1219                vec![(crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip")],
1220                || {
1221                    let builder = HttpExporterBuilder::default();
1222                    let result = builder
1223                        .resolve_compression("NONEXISTENT_SIGNAL_COMPRESSION")
1224                        .unwrap();
1225                    assert_eq!(result, Some(crate::Compression::Gzip));
1226                },
1227            );
1228        }
1229
1230        #[cfg(all(feature = "trace", not(feature = "gzip-http")))]
1231        #[test]
1232        fn test_build_span_exporter_with_gzip_without_feature() {
1233            use super::super::HttpExporterBuilder;
1234            use crate::{ExporterBuildError, WithHttpConfig};
1235
1236            let builder = HttpExporterBuilder::default().with_compression(crate::Compression::Gzip);
1237
1238            let result = builder.build_span_exporter();
1239            // This test will fail until the issue is fixed: compression validation should happen at build time
1240            assert!(matches!(
1241                result,
1242                Err(ExporterBuildError::UnsupportedCompressionAlgorithm(_))
1243            ));
1244        }
1245
1246        #[cfg(all(feature = "trace", not(feature = "zstd-http")))]
1247        #[test]
1248        fn test_build_span_exporter_with_zstd_without_feature() {
1249            use super::super::HttpExporterBuilder;
1250            use crate::{ExporterBuildError, WithHttpConfig};
1251
1252            let builder = HttpExporterBuilder::default().with_compression(crate::Compression::Zstd);
1253
1254            let result = builder.build_span_exporter();
1255            // This test will fail until the issue is fixed: compression validation should happen at build time
1256            assert!(matches!(
1257                result,
1258                Err(ExporterBuildError::UnsupportedCompressionAlgorithm(_))
1259            ));
1260        }
1261    }
1262}