1use super::{
2 default_headers, default_protocol, parse_header_string, resolve_timeout, ExporterBuildError,
3 OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
4};
5use crate::{ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
6use http::{HeaderName, HeaderValue, Uri};
7use opentelemetry::otel_debug;
8use opentelemetry_http::HttpClient;
9use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
10#[cfg(feature = "logs")]
11use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
12#[cfg(feature = "trace")]
13use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
14#[cfg(feature = "logs")]
15use opentelemetry_sdk::logs::LogBatch;
16#[cfg(feature = "trace")]
17use opentelemetry_sdk::trace::SpanData;
18use prost::Message;
19use std::collections::HashMap;
20use std::env;
21use std::str::FromStr;
22use std::sync::{Arc, Mutex};
23use std::time::Duration;
24
25#[cfg(feature = "metrics")]
26mod metrics;
27
28#[cfg(feature = "metrics")]
29use opentelemetry_sdk::metrics::data::ResourceMetrics;
30
31#[cfg(feature = "logs")]
32pub(crate) mod logs;
33
34#[cfg(feature = "trace")]
35mod trace;
36
37#[cfg(all(
38 not(feature = "reqwest-client"),
39 not(feature = "reqwest-blocking-client"),
40 feature = "hyper-client"
41))]
42use opentelemetry_http::hyper::HyperClient;
43
44#[derive(Debug, Default)]
46pub struct HttpConfig {
47 client: Option<Arc<dyn HttpClient>>,
49
50 headers: Option<HashMap<String, String>>,
52
53 compression: Option<crate::Compression>,
55}
56
57#[derive(Debug)]
85pub struct HttpExporterBuilder {
86 pub(crate) exporter_config: ExportConfig,
87 pub(crate) http_config: HttpConfig,
88}
89
90impl Default for HttpExporterBuilder {
91 fn default() -> Self {
92 HttpExporterBuilder {
93 exporter_config: ExportConfig {
94 protocol: default_protocol(),
95 ..ExportConfig::default()
96 },
97 http_config: HttpConfig {
98 headers: Some(default_headers()),
99 ..HttpConfig::default()
100 },
101 }
102 }
103}
104
105impl HttpExporterBuilder {
106 fn build_client(
107 &mut self,
108 signal_endpoint_var: &str,
109 signal_endpoint_path: &str,
110 signal_timeout_var: &str,
111 signal_http_headers_var: &str,
112 signal_compression_var: &str,
113 ) -> Result<OtlpHttpClient, ExporterBuildError> {
114 let endpoint = resolve_http_endpoint(
115 signal_endpoint_var,
116 signal_endpoint_path,
117 self.exporter_config.endpoint.as_deref(),
118 )?;
119
120 let compression = self.resolve_compression(signal_compression_var)?;
121
122 if let Some(compression_alg) = &compression {
124 match compression_alg {
125 crate::Compression::Gzip => {
126 #[cfg(not(feature = "gzip-http"))]
127 {
128 return Err(ExporterBuildError::UnsupportedCompressionAlgorithm(
129 "gzip compression requested but gzip-http feature not enabled"
130 .to_string(),
131 ));
132 }
133 }
134 crate::Compression::Zstd => {
135 #[cfg(not(feature = "zstd-http"))]
136 {
137 return Err(ExporterBuildError::UnsupportedCompressionAlgorithm(
138 "zstd compression requested but zstd-http feature not enabled"
139 .to_string(),
140 ));
141 }
142 }
143 }
144 }
145
146 let timeout = resolve_timeout(signal_timeout_var, self.exporter_config.timeout.as_ref());
147
148 #[allow(unused_mut)] let mut http_client = self.http_config.client.take();
150
151 if http_client.is_none() {
152 #[cfg(all(
153 not(feature = "reqwest-client"),
154 not(feature = "reqwest-blocking-client"),
155 feature = "hyper-client"
156 ))]
157 {
158 http_client = Some(Arc::new(HyperClient::with_default_connector(timeout, None))
160 as Arc<dyn HttpClient>);
161 }
162 #[cfg(all(
163 not(feature = "hyper-client"),
164 not(feature = "reqwest-blocking-client"),
165 feature = "reqwest-client"
166 ))]
167 {
168 http_client = Some(Arc::new(
169 reqwest::Client::builder()
170 .timeout(timeout)
171 .build()
172 .unwrap_or_default(),
173 ) as Arc<dyn HttpClient>);
174 }
175 #[cfg(all(
176 not(feature = "hyper-client"),
177 not(feature = "reqwest-client"),
178 feature = "reqwest-blocking-client"
179 ))]
180 {
181 let timeout_clone = timeout;
182 http_client = Some(Arc::new(
183 std::thread::spawn(move || {
184 reqwest::blocking::Client::builder()
185 .timeout(timeout_clone)
186 .build()
187 .unwrap_or_else(|_| reqwest::blocking::Client::new())
188 })
189 .join()
190 .unwrap(), ) as Arc<dyn HttpClient>);
192 }
193 }
194
195 let http_client = http_client.ok_or(ExporterBuildError::NoHttpClient)?;
196
197 #[allow(clippy::mutable_key_type)] let mut headers: HashMap<HeaderName, HeaderValue> = self
199 .http_config
200 .headers
201 .take()
202 .unwrap_or_default()
203 .into_iter()
204 .filter_map(|(k, v)| {
205 Some((
206 HeaderName::from_str(&k).ok()?,
207 HeaderValue::from_str(&v).ok()?,
208 ))
209 })
210 .collect();
211
212 if let Ok(input) =
214 env::var(signal_http_headers_var).or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
215 {
216 add_header_from_string(&input, &mut headers);
217 }
218
219 Ok(OtlpHttpClient::new(
220 http_client,
221 endpoint,
222 headers,
223 self.exporter_config.protocol,
224 timeout,
225 compression,
226 ))
227 }
228
229 fn resolve_compression(
230 &self,
231 env_override: &str,
232 ) -> Result<Option<crate::Compression>, super::ExporterBuildError> {
233 super::resolve_compression_from_env(self.http_config.compression, env_override)
234 }
235
236 #[cfg(feature = "trace")]
238 pub fn build_span_exporter(mut self) -> Result<crate::SpanExporter, ExporterBuildError> {
239 use crate::{
240 OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
241 OTEL_EXPORTER_OTLP_TRACES_HEADERS, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
242 };
243
244 let client = self.build_client(
245 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
246 "/v1/traces",
247 OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
248 OTEL_EXPORTER_OTLP_TRACES_HEADERS,
249 OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
250 )?;
251
252 Ok(crate::SpanExporter::from_http(client))
253 }
254
255 #[cfg(feature = "logs")]
257 pub fn build_log_exporter(mut self) -> Result<crate::LogExporter, ExporterBuildError> {
258 use crate::{
259 OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
260 OTEL_EXPORTER_OTLP_LOGS_HEADERS, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
261 };
262
263 let client = self.build_client(
264 OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
265 "/v1/logs",
266 OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
267 OTEL_EXPORTER_OTLP_LOGS_HEADERS,
268 OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
269 )?;
270
271 Ok(crate::LogExporter::from_http(client))
272 }
273
274 #[cfg(feature = "metrics")]
276 pub fn build_metrics_exporter(
277 mut self,
278 temporality: opentelemetry_sdk::metrics::Temporality,
279 ) -> Result<crate::MetricExporter, ExporterBuildError> {
280 use crate::{
281 OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
282 OTEL_EXPORTER_OTLP_METRICS_HEADERS, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
283 };
284
285 let client = self.build_client(
286 OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
287 "/v1/metrics",
288 OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
289 OTEL_EXPORTER_OTLP_METRICS_HEADERS,
290 OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
291 )?;
292
293 Ok(crate::MetricExporter::from_http(client, temporality))
294 }
295}
296
297#[derive(Debug)]
298pub(crate) struct OtlpHttpClient {
299 client: Mutex<Option<Arc<dyn HttpClient>>>,
300 collector_endpoint: Uri,
301 headers: HashMap<HeaderName, HeaderValue>,
302 protocol: Protocol,
303 _timeout: Duration,
304 compression: Option<crate::Compression>,
305 #[allow(dead_code)]
306 resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
308}
309
310impl OtlpHttpClient {
311 fn process_body(&self, body: Vec<u8>) -> Result<(Vec<u8>, Option<&'static str>), String> {
315 match self.compression {
316 #[cfg(feature = "gzip-http")]
317 Some(crate::Compression::Gzip) => {
318 use flate2::{write::GzEncoder, Compression};
319 use std::io::Write;
320
321 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
322 encoder.write_all(&body).map_err(|e| e.to_string())?;
323 let compressed = encoder.finish().map_err(|e| e.to_string())?;
324 Ok((compressed, Some("gzip")))
325 }
326 #[cfg(not(feature = "gzip-http"))]
327 Some(crate::Compression::Gzip) => {
328 Err("gzip compression requested but gzip-http feature not enabled".to_string())
329 }
330 #[cfg(feature = "zstd-http")]
331 Some(crate::Compression::Zstd) => {
332 let compressed = zstd::bulk::compress(&body, 0).map_err(|e| e.to_string())?;
333 Ok((compressed, Some("zstd")))
334 }
335 #[cfg(not(feature = "zstd-http"))]
336 Some(crate::Compression::Zstd) => {
337 Err("zstd compression requested but zstd-http feature not enabled".to_string())
338 }
339 None => Ok((body, None)),
340 }
341 }
342
343 #[allow(clippy::mutable_key_type)] fn new(
345 client: Arc<dyn HttpClient>,
346 collector_endpoint: Uri,
347 headers: HashMap<HeaderName, HeaderValue>,
348 protocol: Protocol,
349 timeout: Duration,
350 compression: Option<crate::Compression>,
351 ) -> Self {
352 OtlpHttpClient {
353 client: Mutex::new(Some(client)),
354 collector_endpoint,
355 headers,
356 protocol,
357 _timeout: timeout,
358 compression,
359 resource: ResourceAttributesWithSchema::default(),
360 }
361 }
362
363 #[cfg(feature = "trace")]
364 fn build_trace_export_body(
365 &self,
366 spans: Vec<SpanData>,
367 ) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String> {
368 use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
369 let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource);
370
371 let req = ExportTraceServiceRequest { resource_spans };
372 let (body, content_type) = match self.protocol {
373 #[cfg(feature = "http-json")]
374 Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
375 Ok(json) => (json.into_bytes(), "application/json"),
376 Err(e) => return Err(e.to_string()),
377 },
378 _ => (req.encode_to_vec(), "application/x-protobuf"),
379 };
380
381 let (processed_body, content_encoding) = self.process_body(body)?;
382 Ok((processed_body, content_type, content_encoding))
383 }
384
385 #[cfg(feature = "logs")]
386 fn build_logs_export_body(
387 &self,
388 logs: LogBatch<'_>,
389 ) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String> {
390 use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
391 let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
392 let req = ExportLogsServiceRequest { resource_logs };
393
394 let (body, content_type) = match self.protocol {
395 #[cfg(feature = "http-json")]
396 Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
397 Ok(json) => (json.into_bytes(), "application/json"),
398 Err(e) => return Err(e.to_string()),
399 },
400 _ => (req.encode_to_vec(), "application/x-protobuf"),
401 };
402
403 let (processed_body, content_encoding) = self.process_body(body)?;
404 Ok((processed_body, content_type, content_encoding))
405 }
406
407 #[cfg(feature = "metrics")]
408 fn build_metrics_export_body(
409 &self,
410 metrics: &ResourceMetrics,
411 ) -> Option<(Vec<u8>, &'static str, Option<&'static str>)> {
412 use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
413
414 let req: ExportMetricsServiceRequest = metrics.into();
415
416 let (body, content_type) = match self.protocol {
417 #[cfg(feature = "http-json")]
418 Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
419 Ok(json) => (json.into_bytes(), "application/json"),
420 Err(e) => {
421 otel_debug!(name: "JsonSerializationFaied", error = e.to_string());
422 return None;
423 }
424 },
425 _ => (req.encode_to_vec(), "application/x-protobuf"),
426 };
427
428 match self.process_body(body) {
429 Ok((processed_body, content_encoding)) => {
430 Some((processed_body, content_type, content_encoding))
431 }
432 Err(e) => {
433 otel_debug!(name: "CompressionFailed", error = e);
434 None
435 }
436 }
437 }
438}
439
440fn build_endpoint_uri(endpoint: &str, path: &str) -> Result<Uri, ExporterBuildError> {
441 let path = if endpoint.ends_with('/') && path.starts_with('/') {
442 path.strip_prefix('/').unwrap()
443 } else {
444 path
445 };
446 let endpoint = format!("{endpoint}{path}");
447 endpoint.parse().map_err(|er: http::uri::InvalidUri| {
448 ExporterBuildError::InvalidUri(endpoint, er.to_string())
449 })
450}
451
452fn resolve_http_endpoint(
454 signal_endpoint_var: &str,
455 signal_endpoint_path: &str,
456 provided_endpoint: Option<&str>,
457) -> Result<Uri, ExporterBuildError> {
458 if let Some(provider_endpoint) = provided_endpoint.filter(|s| !s.is_empty()) {
460 provider_endpoint
461 .parse()
462 .map_err(|er: http::uri::InvalidUri| {
463 ExporterBuildError::InvalidUri(provider_endpoint.to_string(), er.to_string())
464 })
465 } else if let Some(endpoint) = env::var(signal_endpoint_var)
466 .ok()
467 .and_then(|s| s.parse().ok())
468 {
469 Ok(endpoint)
471 } else if let Some(endpoint) = env::var(OTEL_EXPORTER_OTLP_ENDPOINT)
472 .ok()
473 .and_then(|s| build_endpoint_uri(&s, signal_endpoint_path).ok())
474 {
475 Ok(endpoint)
477 } else {
478 build_endpoint_uri(
479 OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
480 signal_endpoint_path,
481 )
482 }
483}
484
485#[allow(clippy::mutable_key_type)] fn add_header_from_string(input: &str, headers: &mut HashMap<HeaderName, HeaderValue>) {
487 headers.extend(parse_header_string(input).filter_map(|(key, value)| {
488 Some((
489 HeaderName::from_str(key).ok()?,
490 HeaderValue::from_str(&value).ok()?,
491 ))
492 }));
493}
494
495pub trait HasHttpConfig {
497 fn http_client_config(&mut self) -> &mut HttpConfig;
499}
500
501impl HasHttpConfig for HttpExporterBuilder {
503 fn http_client_config(&mut self) -> &mut HttpConfig {
504 &mut self.http_config
505 }
506}
507
508pub trait WithHttpConfig {
521 fn with_http_client<T: HttpClient + 'static>(self, client: T) -> Self;
523
524 fn with_headers(self, headers: HashMap<String, String>) -> Self;
526
527 fn with_compression(self, compression: crate::Compression) -> Self;
529}
530
531impl<B: HasHttpConfig> WithHttpConfig for B {
532 fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
533 self.http_client_config().client = Some(Arc::new(client));
534 self
535 }
536
537 fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
538 let http_client_headers = self
540 .http_client_config()
541 .headers
542 .get_or_insert(HashMap::new());
543 headers.into_iter().for_each(|(key, value)| {
544 http_client_headers.insert(key, super::url_decode(&value).unwrap_or(value));
545 });
546 self
547 }
548
549 fn with_compression(mut self, compression: crate::Compression) -> Self {
550 self.http_client_config().compression = Some(compression);
551 self
552 }
553}
554
555#[cfg(test)]
556mod tests {
557 use crate::exporter::http::HttpConfig;
558 use crate::exporter::tests::run_env_test;
559 use crate::{
560 HttpExporterBuilder, WithExportConfig, WithHttpConfig, OTEL_EXPORTER_OTLP_ENDPOINT,
561 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
562 };
563
564 use super::{build_endpoint_uri, resolve_http_endpoint};
565
566 #[test]
567 fn test_append_signal_path_to_generic_env() {
568 run_env_test(
569 vec![(OTEL_EXPORTER_OTLP_ENDPOINT, "http://example.com")],
570 || {
571 let endpoint =
572 resolve_http_endpoint(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "/v1/traces", None)
573 .unwrap();
574 assert_eq!(endpoint, "http://example.com/v1/traces");
575 },
576 )
577 }
578
579 #[test]
580 fn test_not_append_signal_path_to_signal_env() {
581 run_env_test(
582 vec![(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://example.com")],
583 || {
584 let endpoint =
585 resolve_http_endpoint(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "/v1/traces", None)
586 .unwrap();
587 assert_eq!(endpoint, "http://example.com");
588 },
589 )
590 }
591
592 #[test]
593 fn test_priority_of_signal_env_over_generic_env() {
594 run_env_test(
595 vec![
596 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://example.com"),
597 (OTEL_EXPORTER_OTLP_ENDPOINT, "http://wrong.com"),
598 ],
599 || {
600 let endpoint = super::resolve_http_endpoint(
601 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
602 "/v1/traces",
603 None,
604 )
605 .unwrap();
606 assert_eq!(endpoint, "http://example.com");
607 },
608 );
609 }
610
611 #[test]
612 fn test_priority_of_code_based_config_over_envs() {
613 run_env_test(
614 vec![
615 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://example.com"),
616 (OTEL_EXPORTER_OTLP_ENDPOINT, "http://wrong.com"),
617 ],
618 || {
619 let endpoint = super::resolve_http_endpoint(
620 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
621 "/v1/traces",
622 Some("http://localhost:4317"),
623 )
624 .unwrap();
625 assert_eq!(endpoint, "http://localhost:4317");
626 },
627 );
628 }
629
630 #[test]
631 fn test_use_default_when_empty_string_for_option() {
632 run_env_test(vec![], || {
633 let endpoint =
634 super::resolve_http_endpoint("non_existent_var", "/v1/traces", Some("")).unwrap();
635 assert_eq!(endpoint, "http://localhost:4318/v1/traces");
636 });
637 }
638
639 #[test]
640 fn test_use_default_when_others_missing() {
641 run_env_test(vec![], || {
642 let endpoint =
643 super::resolve_http_endpoint("NON_EXISTENT_VAR", "/v1/traces", None).unwrap();
644 assert_eq!(endpoint, "http://localhost:4318/v1/traces");
645 });
646 }
647
648 #[test]
649 fn test_build_endpoint_uri() {
650 let uri = build_endpoint_uri("https://example.com", "/v1/traces").unwrap();
651 assert_eq!(uri, "https://example.com/v1/traces");
652
653 let uri = build_endpoint_uri("https://example.com/", "/v1/traces").unwrap();
655 assert_eq!(uri, "https://example.com/v1/traces");
656
657 let uri = build_endpoint_uri("https://example.com/additional/path/", "/v1/traces").unwrap();
659 assert_eq!(uri, "https://example.com/additional/path/v1/traces");
660 }
661
662 #[test]
663 fn test_invalid_uri_in_signal_env_falls_back_to_generic_env() {
664 run_env_test(
665 vec![
666 (
667 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
668 "-*/*-/*-//-/-/invalid-uri",
669 ),
670 (OTEL_EXPORTER_OTLP_ENDPOINT, "http://example.com"),
671 ],
672 || {
673 let endpoint = super::resolve_http_endpoint(
674 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
675 "/v1/traces",
676 None,
677 )
678 .unwrap();
679 assert_eq!(endpoint, "http://example.com/v1/traces");
680 },
681 );
682 }
683
684 #[test]
685 fn test_all_invalid_urls_falls_back_to_error() {
686 run_env_test(vec![], || {
687 let result = super::resolve_http_endpoint(
688 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
689 "/v1/traces",
690 Some("-*/*-/*-//-/-/yet-another-invalid-uri"),
691 );
692 assert!(result.is_err());
693 });
695 }
696
697 #[test]
698 fn test_add_header_from_string() {
699 use http::{HeaderName, HeaderValue};
700 use std::collections::HashMap;
701 let test_cases = vec![
702 ("k1=v1", vec![("k1", "v1")]),
704 ("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]),
705 ("k1=v1=10,k2,k3", vec![("k1", "v1=10")]),
706 ("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]),
707 ];
708
709 for (input_str, expected_headers) in test_cases {
710 #[allow(clippy::mutable_key_type)] let mut headers: HashMap<HeaderName, HeaderValue> = HashMap::new();
712 super::add_header_from_string(input_str, &mut headers);
713
714 assert_eq!(
715 headers.len(),
716 expected_headers.len(),
717 "Failed on input: {input_str}"
718 );
719
720 for (expected_key, expected_value) in expected_headers {
721 assert_eq!(
722 headers.get(&HeaderName::from_static(expected_key)),
723 Some(&HeaderValue::from_static(expected_value)),
724 "Failed on key: {expected_key} with input: {input_str}"
725 );
726 }
727 }
728 }
729
730 #[test]
731 fn test_merge_header_from_string() {
732 use http::{HeaderName, HeaderValue};
733 use std::collections::HashMap;
734 #[allow(clippy::mutable_key_type)] let mut headers: HashMap<HeaderName, HeaderValue> = std::collections::HashMap::new();
736 headers.insert(
737 HeaderName::from_static("k1"),
738 HeaderValue::from_static("v1"),
739 );
740 headers.insert(
741 HeaderName::from_static("k2"),
742 HeaderValue::from_static("v2"),
743 );
744 let test_cases = vec![
745 ("k1=v1_new", vec![("k1", "v1_new"), ("k2", "v2")]),
747 (
748 "k3=val=10,22,34,k4=,k5=10",
749 vec![
750 ("k1", "v1_new"),
751 ("k2", "v2"),
752 ("k3", "val=10"),
753 ("k5", "10"),
754 ],
755 ),
756 ];
757
758 for (input_str, expected_headers) in test_cases {
759 super::add_header_from_string(input_str, &mut headers);
760
761 assert_eq!(
762 headers.len(),
763 expected_headers.len(),
764 "Failed on input: {input_str}"
765 );
766
767 for (expected_key, expected_value) in expected_headers {
768 assert_eq!(
769 headers.get(&HeaderName::from_static(expected_key)),
770 Some(&HeaderValue::from_static(expected_value)),
771 "Failed on key: {expected_key} with input: {input_str}"
772 );
773 }
774 }
775 }
776
777 #[test]
778 fn test_http_exporter_builder_with_headers() {
779 use std::collections::HashMap;
780 let initial_headers = HashMap::from([("k1".to_string(), "v1".to_string())]);
782 let extra_headers = HashMap::from([
783 ("k2".to_string(), "v2".to_string()),
784 ("k3".to_string(), "v3".to_string()),
785 ]);
786 let expected_headers = initial_headers.iter().chain(extra_headers.iter()).fold(
787 HashMap::new(),
788 |mut acc, (k, v)| {
789 acc.insert(k.clone(), v.clone());
790 acc
791 },
792 );
793 let builder = HttpExporterBuilder {
794 http_config: HttpConfig {
795 client: None,
796 headers: Some(initial_headers),
797 compression: None,
798 },
799 exporter_config: crate::ExportConfig::default(),
800 };
801
802 let builder = builder.with_headers(extra_headers);
804
805 assert_eq!(
807 builder
808 .http_config
809 .headers
810 .clone()
811 .expect("headers should always be Some"),
812 expected_headers,
813 );
814 }
815
816 #[test]
817 fn test_http_exporter_endpoint() {
818 run_env_test(vec![], || {
820 let exporter = HttpExporterBuilder::default();
821
822 let url = resolve_http_endpoint(
823 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
824 "/v1/traces",
825 exporter.exporter_config.endpoint.as_deref(),
826 )
827 .unwrap();
828
829 assert_eq!(url, "http://localhost:4318/v1/traces");
830 });
831
832 run_env_test(vec![], || {
834 let exporter = HttpExporterBuilder::default()
835 .with_endpoint("http://localhost:4318/v1/tracesbutnotreally");
836
837 let url = resolve_http_endpoint(
838 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
839 "/v1/traces",
840 exporter.exporter_config.endpoint.as_deref(),
841 )
842 .unwrap();
843
844 assert_eq!(url, "http://localhost:4318/v1/tracesbutnotreally");
845 });
846 }
847
848 #[cfg(feature = "gzip-http")]
849 mod compression_tests {
850 use super::super::OtlpHttpClient;
851 use flate2::read::GzDecoder;
852 use opentelemetry_http::{Bytes, HttpClient};
853 use std::io::Read;
854
855 #[test]
856 fn test_gzip_compression_and_decompression() {
857 let client = OtlpHttpClient::new(
858 std::sync::Arc::new(MockHttpClient),
859 "http://localhost:4318".parse().unwrap(),
860 std::collections::HashMap::new(),
861 crate::Protocol::HttpBinary,
862 std::time::Duration::from_secs(10),
863 Some(crate::Compression::Gzip),
864 );
865
866 let test_data = b"Hello, world! This is test data for compression.";
868 let result = client.process_body(test_data.to_vec()).unwrap();
869 let (compressed_body, content_encoding) = result;
870
871 assert_eq!(content_encoding, Some("gzip"));
873
874 let mut decoder = GzDecoder::new(&compressed_body[..]);
876 let mut decompressed = Vec::new();
877 decoder.read_to_end(&mut decompressed).unwrap();
878
879 assert_eq!(decompressed, test_data);
881 assert_ne!(compressed_body, test_data.to_vec());
883 }
884
885 #[cfg(feature = "zstd-http")]
886 #[test]
887 fn test_zstd_compression_and_decompression() {
888 let client = OtlpHttpClient::new(
889 std::sync::Arc::new(MockHttpClient),
890 "http://localhost:4318".parse().unwrap(),
891 std::collections::HashMap::new(),
892 crate::Protocol::HttpBinary,
893 std::time::Duration::from_secs(10),
894 Some(crate::Compression::Zstd),
895 );
896
897 let test_data = b"Hello, world! This is test data for zstd compression.";
899 let result = client.process_body(test_data.to_vec()).unwrap();
900 let (compressed_body, content_encoding) = result;
901
902 assert_eq!(content_encoding, Some("zstd"));
904
905 let decompressed = zstd::bulk::decompress(&compressed_body, test_data.len()).unwrap();
907
908 assert_eq!(decompressed, test_data);
910 assert_ne!(compressed_body, test_data.to_vec());
912 }
913
914 #[test]
915 fn test_no_compression_when_disabled() {
916 let client = OtlpHttpClient::new(
917 std::sync::Arc::new(MockHttpClient),
918 "http://localhost:4318".parse().unwrap(),
919 std::collections::HashMap::new(),
920 crate::Protocol::HttpBinary,
921 std::time::Duration::from_secs(10),
922 None, );
924
925 let body = vec![1, 2, 3, 4];
926 let result = client.process_body(body.clone()).unwrap();
927 let (result_body, content_encoding) = result;
928
929 assert_eq!(result_body, body);
931 assert_eq!(content_encoding, None);
932 }
933
934 #[cfg(not(feature = "gzip-http"))]
935 #[test]
936 fn test_gzip_error_when_feature_disabled() {
937 let client = OtlpHttpClient::new(
938 std::sync::Arc::new(MockHttpClient),
939 "http://localhost:4318".parse().unwrap(),
940 std::collections::HashMap::new(),
941 crate::Protocol::HttpBinary,
942 std::time::Duration::from_secs(10),
943 Some(crate::Compression::Gzip),
944 );
945
946 let body = vec![1, 2, 3, 4];
947 let result = client.process_body(body);
948
949 assert!(result.is_err());
951 assert!(result
952 .unwrap_err()
953 .contains("gzip-http feature not enabled"));
954 }
955
956 #[cfg(not(feature = "zstd-http"))]
957 #[test]
958 fn test_zstd_error_when_feature_disabled() {
959 let client = OtlpHttpClient::new(
960 std::sync::Arc::new(MockHttpClient),
961 "http://localhost:4318".parse().unwrap(),
962 std::collections::HashMap::new(),
963 crate::Protocol::HttpBinary,
964 std::time::Duration::from_secs(10),
965 Some(crate::Compression::Zstd),
966 );
967
968 let body = vec![1, 2, 3, 4];
969 let result = client.process_body(body);
970
971 assert!(result.is_err());
973 assert!(result
974 .unwrap_err()
975 .contains("zstd-http feature not enabled"));
976 }
977
978 #[derive(Debug)]
980 struct MockHttpClient;
981
982 #[async_trait::async_trait]
983 impl HttpClient for MockHttpClient {
984 async fn send_bytes(
985 &self,
986 _request: http::Request<Bytes>,
987 ) -> Result<http::Response<Bytes>, opentelemetry_http::HttpError> {
988 Ok(http::Response::builder()
989 .status(200)
990 .body(Bytes::new())
991 .unwrap())
992 }
993 }
994 }
995
996 mod export_body_tests {
997 use super::super::OtlpHttpClient;
998 use opentelemetry_http::{Bytes, HttpClient};
999 use std::collections::HashMap;
1000
1001 #[derive(Debug)]
1002 struct MockHttpClient;
1003
1004 #[async_trait::async_trait]
1005 impl HttpClient for MockHttpClient {
1006 async fn send_bytes(
1007 &self,
1008 _request: http::Request<Bytes>,
1009 ) -> Result<http::Response<Bytes>, opentelemetry_http::HttpError> {
1010 Ok(http::Response::builder()
1011 .status(200)
1012 .body(Bytes::new())
1013 .unwrap())
1014 }
1015 }
1016
1017 fn create_test_client(
1018 protocol: crate::Protocol,
1019 compression: Option<crate::Compression>,
1020 ) -> OtlpHttpClient {
1021 OtlpHttpClient::new(
1022 std::sync::Arc::new(MockHttpClient),
1023 "http://localhost:4318".parse().unwrap(),
1024 HashMap::new(),
1025 protocol,
1026 std::time::Duration::from_secs(10),
1027 compression,
1028 )
1029 }
1030
1031 fn create_test_span_data() -> opentelemetry_sdk::trace::SpanData {
1032 use opentelemetry::trace::Status;
1033 use opentelemetry::trace::{
1034 SpanContext, SpanId, SpanKind, TraceFlags, TraceId, TraceState,
1035 };
1036 use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
1037 use std::borrow::Cow;
1038 use std::time::{Duration, SystemTime};
1039
1040 let span_context = SpanContext::new(
1041 TraceId::from(123),
1042 SpanId::from(456),
1043 TraceFlags::default(),
1044 false,
1045 TraceState::default(),
1046 );
1047 SpanData {
1048 span_context,
1049 parent_span_id: SpanId::from(0),
1050 parent_span_is_remote: false,
1051 span_kind: SpanKind::Internal,
1052 name: Cow::Borrowed("test_span"),
1053 start_time: SystemTime::UNIX_EPOCH,
1054 end_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1),
1055 attributes: vec![],
1056 dropped_attributes_count: 0,
1057 events: SpanEvents::default(),
1058 links: SpanLinks::default(),
1059 status: Status::Unset,
1060 instrumentation_scope: opentelemetry::InstrumentationScope::default(),
1061 }
1062 }
1063
1064 #[cfg(feature = "trace")]
1065 #[test]
1066 fn test_build_trace_export_body_binary_protocol() {
1067 let client = create_test_client(crate::Protocol::HttpBinary, None);
1068 let span_data = create_test_span_data();
1069
1070 let result = client.build_trace_export_body(vec![span_data]).unwrap();
1071 let (_body, content_type, content_encoding) = result;
1072
1073 assert_eq!(content_type, "application/x-protobuf");
1074 assert_eq!(content_encoding, None);
1075 }
1076
1077 #[cfg(all(feature = "trace", feature = "http-json"))]
1078 #[test]
1079 fn test_build_trace_export_body_json_protocol() {
1080 let client = create_test_client(crate::Protocol::HttpJson, None);
1081 let span_data = create_test_span_data();
1082
1083 let result = client.build_trace_export_body(vec![span_data]).unwrap();
1084 let (_body, content_type, content_encoding) = result;
1085
1086 assert_eq!(content_type, "application/json");
1087 assert_eq!(content_encoding, None);
1088 }
1089
1090 #[cfg(all(feature = "trace", feature = "gzip-http"))]
1091 #[test]
1092 fn test_build_trace_export_body_with_compression() {
1093 let client =
1094 create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip));
1095 let span_data = create_test_span_data();
1096
1097 let result = client.build_trace_export_body(vec![span_data]).unwrap();
1098 let (_body, content_type, content_encoding) = result;
1099
1100 assert_eq!(content_type, "application/x-protobuf");
1101 assert_eq!(content_encoding, Some("gzip"));
1102 }
1103
1104 #[cfg(feature = "logs")]
1105 fn create_test_log_batch() -> opentelemetry_sdk::logs::LogBatch<'static> {
1106 use opentelemetry_sdk::logs::LogBatch;
1107
1108 LogBatch::new(&[])
1110 }
1111
1112 #[cfg(feature = "logs")]
1113 #[test]
1114 fn test_build_logs_export_body_binary_protocol() {
1115 let client = create_test_client(crate::Protocol::HttpBinary, None);
1116 let batch = create_test_log_batch();
1117
1118 let result = client.build_logs_export_body(batch).unwrap();
1119 let (_body, content_type, content_encoding) = result;
1120
1121 assert_eq!(content_type, "application/x-protobuf");
1122 assert_eq!(content_encoding, None);
1123 }
1124
1125 #[cfg(all(feature = "logs", feature = "http-json"))]
1126 #[test]
1127 fn test_build_logs_export_body_json_protocol() {
1128 let client = create_test_client(crate::Protocol::HttpJson, None);
1129 let batch = create_test_log_batch();
1130
1131 let result = client.build_logs_export_body(batch).unwrap();
1132 let (_body, content_type, content_encoding) = result;
1133
1134 assert_eq!(content_type, "application/json");
1135 assert_eq!(content_encoding, None);
1136 }
1137
1138 #[cfg(all(feature = "logs", feature = "gzip-http"))]
1139 #[test]
1140 fn test_build_logs_export_body_with_compression() {
1141 let client =
1142 create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip));
1143 let batch = create_test_log_batch();
1144
1145 let result = client.build_logs_export_body(batch).unwrap();
1146 let (_body, content_type, content_encoding) = result;
1147
1148 assert_eq!(content_type, "application/x-protobuf");
1149 assert_eq!(content_encoding, Some("gzip"));
1150 }
1151
1152 #[cfg(feature = "metrics")]
1153 #[test]
1154 fn test_build_metrics_export_body_binary_protocol() {
1155 use opentelemetry_sdk::metrics::data::ResourceMetrics;
1156
1157 let client = create_test_client(crate::Protocol::HttpBinary, None);
1158 let metrics = ResourceMetrics::default();
1159
1160 let result = client.build_metrics_export_body(&metrics).unwrap();
1161 let (_body, content_type, content_encoding) = result;
1162
1163 assert_eq!(content_type, "application/x-protobuf");
1164 assert_eq!(content_encoding, None);
1165 }
1166
1167 #[cfg(all(feature = "metrics", feature = "http-json"))]
1168 #[test]
1169 fn test_build_metrics_export_body_json_protocol() {
1170 use opentelemetry_sdk::metrics::data::ResourceMetrics;
1171
1172 let client = create_test_client(crate::Protocol::HttpJson, None);
1173 let metrics = ResourceMetrics::default();
1174
1175 let result = client.build_metrics_export_body(&metrics).unwrap();
1176 let (_body, content_type, content_encoding) = result;
1177
1178 assert_eq!(content_type, "application/json");
1179 assert_eq!(content_encoding, None);
1180 }
1181
1182 #[cfg(all(feature = "metrics", feature = "gzip-http"))]
1183 #[test]
1184 fn test_build_metrics_export_body_with_compression() {
1185 use opentelemetry_sdk::metrics::data::ResourceMetrics;
1186
1187 let client =
1188 create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip));
1189 let metrics = ResourceMetrics::default();
1190
1191 let result = client.build_metrics_export_body(&metrics).unwrap();
1192 let (_body, content_type, content_encoding) = result;
1193
1194 assert_eq!(content_type, "application/x-protobuf");
1195 assert_eq!(content_encoding, Some("gzip"));
1196 }
1197
1198 #[cfg(all(feature = "metrics", not(feature = "gzip-http")))]
1199 #[test]
1200 fn test_build_metrics_export_body_compression_error_returns_none() {
1201 use opentelemetry_sdk::metrics::data::ResourceMetrics;
1202
1203 let client =
1204 create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip));
1205 let metrics = ResourceMetrics::default();
1206
1207 let result = client.build_metrics_export_body(&metrics);
1209 assert!(result.is_none());
1210 }
1211
1212 #[test]
1213 fn test_resolve_compression_uses_generic_env_fallback() {
1214 use super::super::HttpExporterBuilder;
1215 use crate::exporter::tests::run_env_test;
1216
1217 run_env_test(
1219 vec![(crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip")],
1220 || {
1221 let builder = HttpExporterBuilder::default();
1222 let result = builder
1223 .resolve_compression("NONEXISTENT_SIGNAL_COMPRESSION")
1224 .unwrap();
1225 assert_eq!(result, Some(crate::Compression::Gzip));
1226 },
1227 );
1228 }
1229
1230 #[cfg(all(feature = "trace", not(feature = "gzip-http")))]
1231 #[test]
1232 fn test_build_span_exporter_with_gzip_without_feature() {
1233 use super::super::HttpExporterBuilder;
1234 use crate::{ExporterBuildError, WithHttpConfig};
1235
1236 let builder = HttpExporterBuilder::default().with_compression(crate::Compression::Gzip);
1237
1238 let result = builder.build_span_exporter();
1239 assert!(matches!(
1241 result,
1242 Err(ExporterBuildError::UnsupportedCompressionAlgorithm(_))
1243 ));
1244 }
1245
1246 #[cfg(all(feature = "trace", not(feature = "zstd-http")))]
1247 #[test]
1248 fn test_build_span_exporter_with_zstd_without_feature() {
1249 use super::super::HttpExporterBuilder;
1250 use crate::{ExporterBuildError, WithHttpConfig};
1251
1252 let builder = HttpExporterBuilder::default().with_compression(crate::Compression::Zstd);
1253
1254 let result = builder.build_span_exporter();
1255 assert!(matches!(
1257 result,
1258 Err(ExporterBuildError::UnsupportedCompressionAlgorithm(_))
1259 ));
1260 }
1261 }
1262}