1use std::env;
2use std::fmt::{Debug, Formatter};
3use std::str::FromStr;
4
5use http::{HeaderMap, HeaderName, HeaderValue};
6use opentelemetry::otel_debug;
7use tonic::codec::CompressionEncoding;
8use tonic::metadata::{KeyAndValueRef, MetadataMap};
9use tonic::service::Interceptor;
10use tonic::transport::Channel;
11#[cfg(feature = "tls")]
12use tonic::transport::ClientTlsConfig;
13
14use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT};
15use super::{resolve_timeout, ExporterBuildError};
16use crate::exporter::Compression;
17use crate::{ExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
18
19#[cfg(feature = "logs")]
20pub(crate) mod logs;
21
22#[cfg(feature = "metrics")]
23pub(crate) mod metrics;
24
25#[cfg(feature = "trace")]
26pub(crate) mod trace;
27
28#[derive(Debug, Default)]
32#[non_exhaustive]
33pub struct TonicConfig {
34 pub(crate) metadata: Option<MetadataMap>,
36 #[cfg(feature = "tls")]
38 pub(crate) tls_config: Option<ClientTlsConfig>,
39 pub(crate) compression: Option<Compression>,
41 pub(crate) channel: Option<tonic::transport::Channel>,
42 pub(crate) interceptor: Option<BoxInterceptor>,
43}
44
45impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
46 type Error = ExporterBuildError;
47
48 fn try_from(value: Compression) -> Result<Self, ExporterBuildError> {
49 match value {
50 #[cfg(feature = "gzip-tonic")]
51 Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
52 #[cfg(not(feature = "gzip-tonic"))]
53 Compression::Gzip => Err(ExporterBuildError::FeatureRequiredForCompressionAlgorithm(
54 "gzip-tonic",
55 Compression::Gzip,
56 )),
57 #[cfg(feature = "zstd-tonic")]
58 Compression::Zstd => Ok(tonic::codec::CompressionEncoding::Zstd),
59 #[cfg(not(feature = "zstd-tonic"))]
60 Compression::Zstd => Err(ExporterBuildError::FeatureRequiredForCompressionAlgorithm(
61 "zstd-tonic",
62 Compression::Zstd,
63 )),
64 }
65 }
66}
67
68#[derive(Debug)]
103pub struct TonicExporterBuilder {
104 pub(crate) tonic_config: TonicConfig,
105 pub(crate) exporter_config: ExportConfig,
106}
107
108pub(crate) struct BoxInterceptor(Box<dyn Interceptor + Send + Sync>);
109impl tonic::service::Interceptor for BoxInterceptor {
110 fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
111 self.0.call(request)
112 }
113}
114
115impl Debug for BoxInterceptor {
116 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
117 write!(f, "BoxInterceptor(..)")
118 }
119}
120
121impl Default for TonicExporterBuilder {
122 fn default() -> Self {
123 TonicExporterBuilder {
124 tonic_config: TonicConfig {
125 metadata: Some(MetadataMap::from_headers(
126 (&default_headers())
127 .try_into()
128 .expect("Invalid tonic headers"),
129 )),
130 #[cfg(feature = "tls")]
131 tls_config: None,
132 compression: None,
133 channel: Option::default(),
134 interceptor: Option::default(),
135 },
136 exporter_config: ExportConfig {
137 protocol: crate::Protocol::Grpc,
138 ..Default::default()
139 },
140 }
141 }
142}
143
144impl TonicExporterBuilder {
145 #[allow(unused)]
147 fn build_channel(
148 self,
149 signal_endpoint_var: &str,
150 signal_timeout_var: &str,
151 signal_compression_var: &str,
152 signal_headers_var: &str,
153 ) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), ExporterBuildError> {
154 let compression = self.resolve_compression(signal_compression_var)?;
155
156 let (headers_from_env, headers_for_logging) = parse_headers_from_env(signal_headers_var);
157 let metadata = merge_metadata_with_headers_from_env(
158 self.tonic_config.metadata.unwrap_or_default(),
159 headers_from_env,
160 );
161
162 let add_metadata = move |mut req: tonic::Request<()>| {
163 for key_and_value in metadata.iter() {
164 match key_and_value {
165 KeyAndValueRef::Ascii(key, value) => {
166 req.metadata_mut().append(key, value.to_owned())
167 }
168 KeyAndValueRef::Binary(key, value) => {
169 req.metadata_mut().append_bin(key, value.to_owned())
170 }
171 };
172 }
173
174 Ok(req)
175 };
176
177 let interceptor = match self.tonic_config.interceptor {
178 Some(mut interceptor) => {
179 BoxInterceptor(Box::new(move |req| interceptor.call(add_metadata(req)?)))
180 }
181 None => BoxInterceptor(Box::new(add_metadata)),
182 };
183
184 if let Some(channel) = self.tonic_config.channel {
186 return Ok((channel, interceptor, compression));
187 }
188
189 let config = self.exporter_config;
190
191 let endpoint = Self::resolve_endpoint(signal_endpoint_var, config.endpoint);
192
193 let endpoint_clone = endpoint.clone();
195
196 let endpoint = Channel::from_shared(endpoint)
197 .map_err(|op| ExporterBuildError::InvalidUri(endpoint_clone.clone(), op.to_string()))?;
198 let timeout = resolve_timeout(signal_timeout_var, config.timeout.as_ref());
199
200 #[cfg(feature = "tls")]
201 let channel = match self.tonic_config.tls_config {
202 Some(tls_config) => endpoint
203 .tls_config(tls_config)
204 .map_err(|er| ExporterBuildError::InternalFailure(er.to_string()))?,
205 None => endpoint,
206 }
207 .timeout(timeout)
208 .connect_lazy();
209
210 #[cfg(not(feature = "tls"))]
211 let channel = endpoint.timeout(timeout).connect_lazy();
212
213 otel_debug!(name: "TonicChannelBuilt", endpoint = endpoint_clone, timeout_in_millisecs = timeout.as_millis(), compression = format!("{:?}", compression), headers = format!("{:?}", headers_for_logging));
214 Ok((channel, interceptor, compression))
215 }
216
217 fn resolve_endpoint(default_endpoint_var: &str, provided_endpoint: Option<String>) -> String {
218 if let Some(endpoint) = provided_endpoint.filter(|s| !s.is_empty()) {
226 endpoint
227 } else if let Ok(endpoint) = env::var(default_endpoint_var) {
228 endpoint
229 } else if let Ok(endpoint) = env::var(OTEL_EXPORTER_OTLP_ENDPOINT) {
230 endpoint
231 } else {
232 OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT.to_string()
233 }
234 }
235
236 fn resolve_compression(
237 &self,
238 env_override: &str,
239 ) -> Result<Option<CompressionEncoding>, ExporterBuildError> {
240 super::resolve_compression_from_env(self.tonic_config.compression, env_override)?
241 .map(|c| c.try_into())
242 .transpose()
243 }
244
245 #[cfg(feature = "logs")]
247 pub(crate) fn build_log_exporter(self) -> Result<crate::logs::LogExporter, ExporterBuildError> {
248 use crate::exporter::tonic::logs::TonicLogsClient;
249
250 otel_debug!(name: "LogsTonicChannelBuilding");
251
252 let (channel, interceptor, compression) = self.build_channel(
253 crate::logs::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
254 crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
255 crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
256 crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
257 )?;
258
259 let client = TonicLogsClient::new(channel, interceptor, compression);
260
261 Ok(crate::logs::LogExporter::from_tonic(client))
262 }
263
264 #[cfg(feature = "metrics")]
266 pub(crate) fn build_metrics_exporter(
267 self,
268 temporality: opentelemetry_sdk::metrics::Temporality,
269 ) -> Result<crate::MetricExporter, ExporterBuildError> {
270 use crate::MetricExporter;
271 use metrics::TonicMetricsClient;
272
273 otel_debug!(name: "MetricsTonicChannelBuilding");
274
275 let (channel, interceptor, compression) = self.build_channel(
276 crate::metric::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
277 crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
278 crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
279 crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
280 )?;
281
282 let client = TonicMetricsClient::new(channel, interceptor, compression);
283
284 Ok(MetricExporter::from_tonic(client, temporality))
285 }
286
287 #[cfg(feature = "trace")]
289 pub(crate) fn build_span_exporter(self) -> Result<crate::SpanExporter, ExporterBuildError> {
290 use crate::exporter::tonic::trace::TonicTracesClient;
291
292 otel_debug!(name: "TracesTonicChannelBuilding");
293
294 let (channel, interceptor, compression) = self.build_channel(
295 crate::span::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
296 crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
297 crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
298 crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
299 )?;
300
301 let client = TonicTracesClient::new(channel, interceptor, compression);
302
303 Ok(crate::SpanExporter::from_tonic(client))
304 }
305}
306
307fn merge_metadata_with_headers_from_env(
308 metadata: MetadataMap,
309 headers_from_env: HeaderMap,
310) -> MetadataMap {
311 if headers_from_env.is_empty() {
312 metadata
313 } else {
314 let mut existing_headers: HeaderMap = metadata.into_headers();
315 existing_headers.extend(headers_from_env);
316
317 MetadataMap::from_headers(existing_headers)
318 }
319}
320
321fn parse_headers_from_env(signal_headers_var: &str) -> (HeaderMap, Vec<(String, String)>) {
322 let mut headers = Vec::new();
323
324 (
325 env::var(signal_headers_var)
326 .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
327 .map(|input| {
328 parse_header_string(&input)
329 .filter_map(|(key, value)| {
330 headers.push((key.to_owned(), value.clone()));
331 Some((
332 HeaderName::from_str(key).ok()?,
333 HeaderValue::from_str(&value).ok()?,
334 ))
335 })
336 .collect::<HeaderMap>()
337 })
338 .unwrap_or_default(),
339 headers,
340 )
341}
342
343pub trait HasTonicConfig {
345 fn tonic_config(&mut self) -> &mut TonicConfig;
347}
348
349impl HasTonicConfig for TonicExporterBuilder {
351 fn tonic_config(&mut self) -> &mut TonicConfig {
352 &mut self.tonic_config
353 }
354}
355
356pub trait WithTonicConfig {
371 #[cfg(feature = "tls")]
373 fn with_tls_config(self, tls_config: ClientTlsConfig) -> Self;
374
375 fn with_metadata(self, metadata: MetadataMap) -> Self;
403
404 fn with_compression(self, compression: Compression) -> Self;
406
407 fn with_channel(self, channel: tonic::transport::Channel) -> Self;
414
415 fn with_interceptor<I>(self, interceptor: I) -> Self
476 where
477 I: tonic::service::Interceptor + Clone + Send + Sync + 'static;
478}
479
480impl<B: HasTonicConfig> WithTonicConfig for B {
481 #[cfg(feature = "tls")]
482 fn with_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
483 self.tonic_config().tls_config = Some(tls_config);
484 self
485 }
486
487 fn with_metadata(mut self, metadata: MetadataMap) -> Self {
489 let mut existing_headers = self
491 .tonic_config()
492 .metadata
493 .clone()
494 .unwrap_or_default()
495 .into_headers();
496 existing_headers.extend(metadata.into_headers());
497
498 self.tonic_config().metadata = Some(MetadataMap::from_headers(existing_headers));
499 self
500 }
501
502 fn with_compression(mut self, compression: Compression) -> Self {
503 self.tonic_config().compression = Some(compression);
504 self
505 }
506
507 fn with_channel(mut self, channel: tonic::transport::Channel) -> Self {
508 self.tonic_config().channel = Some(channel);
509 self
510 }
511
512 fn with_interceptor<I>(mut self, interceptor: I) -> Self
513 where
514 I: tonic::service::Interceptor + Clone + Send + Sync + 'static,
515 {
516 self.tonic_config().interceptor = Some(BoxInterceptor(Box::new(interceptor)));
517 self
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use crate::exporter::tests::run_env_test;
524 use crate::exporter::tonic::WithTonicConfig;
525 #[cfg(feature = "grpc-tonic")]
526 use crate::exporter::Compression;
527 use crate::{TonicExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
528 use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
529 use http::{HeaderMap, HeaderName, HeaderValue};
530 use tonic::metadata::{MetadataMap, MetadataValue};
531
532 #[test]
533 fn test_with_metadata() {
534 let mut metadata = MetadataMap::new();
536 metadata.insert("foo", "bar".parse().unwrap());
537 let builder = TonicExporterBuilder::default().with_metadata(metadata);
538 let result = builder.tonic_config.metadata.unwrap();
539 let foo = result
540 .get("foo")
541 .expect("there to always be an entry for foo");
542 assert_eq!(foo, &MetadataValue::try_from("bar").unwrap());
543 assert!(result.get("User-Agent").is_some());
544
545 let mut metadata = MetadataMap::new();
547 metadata.insert("user-agent", "baz".parse().unwrap());
548 let builder = TonicExporterBuilder::default().with_metadata(metadata);
549 let result = builder.tonic_config.metadata.unwrap();
550 assert_eq!(
551 result.get("User-Agent").unwrap(),
552 &MetadataValue::try_from("baz").unwrap()
553 );
554 assert_eq!(
555 result.len(),
556 TonicExporterBuilder::default()
557 .tonic_config
558 .metadata
559 .unwrap()
560 .len()
561 );
562 }
563
564 #[test]
565 #[cfg(feature = "gzip-tonic")]
566 fn test_with_gzip_compression() {
567 let mut metadata = MetadataMap::new();
569 metadata.insert("foo", "bar".parse().unwrap());
570 let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
571 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
572 }
573
574 #[test]
575 #[cfg(feature = "zstd-tonic")]
576 fn test_with_zstd_compression() {
577 let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
578 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Zstd);
579 }
580
581 #[test]
582 fn test_convert_compression() {
583 #[cfg(feature = "gzip-tonic")]
584 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_ok());
585 #[cfg(not(feature = "gzip-tonic"))]
586 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_err());
587 #[cfg(feature = "zstd-tonic")]
588 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_ok());
589 #[cfg(not(feature = "zstd-tonic"))]
590 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_err());
591 }
592
593 #[cfg(feature = "zstd-tonic")]
594 #[test]
595 fn test_priority_of_signal_env_over_generic_env_for_compression() {
596 run_env_test(
597 vec![
598 (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "zstd"),
599 (crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"),
600 ],
601 || {
602 let builder = TonicExporterBuilder::default();
603
604 let compression = builder
605 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
606 .unwrap();
607 assert_eq!(compression, Some(tonic::codec::CompressionEncoding::Zstd));
608 },
609 );
610 }
611
612 #[cfg(feature = "zstd-tonic")]
613 #[test]
614 fn test_priority_of_code_based_config_over_envs_for_compression() {
615 run_env_test(
616 vec![
617 (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "gzip"),
618 (crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"),
619 ],
620 || {
621 let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
622
623 let compression = builder
624 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
625 .unwrap();
626 assert_eq!(compression, Some(tonic::codec::CompressionEncoding::Zstd));
627 },
628 );
629 }
630
631 #[test]
632 fn test_use_default_when_others_missing_for_compression() {
633 run_env_test(vec![], || {
634 let builder = TonicExporterBuilder::default();
635
636 let compression = builder
637 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
638 .unwrap();
639 assert!(compression.is_none());
640 });
641 }
642
643 #[test]
644 fn test_parse_headers_from_env() {
645 run_env_test(
646 vec![
647 (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
648 (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
649 ],
650 || {
651 assert_eq!(
652 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS).0,
653 HeaderMap::from_iter([
654 (
655 HeaderName::from_static("k1"),
656 HeaderValue::from_static("v1")
657 ),
658 (
659 HeaderName::from_static("k2"),
660 HeaderValue::from_static("v2")
661 ),
662 ])
663 );
664
665 assert_eq!(
666 super::parse_headers_from_env("EMPTY_ENV").0,
667 HeaderMap::from_iter([(
668 HeaderName::from_static("k3"),
669 HeaderValue::from_static("v3")
670 )])
671 );
672 },
673 )
674 }
675
676 #[test]
677 fn test_merge_metadata_with_headers_from_env() {
678 run_env_test(
679 vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
680 || {
681 let headers_from_env =
682 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);
683
684 let mut metadata = MetadataMap::new();
685 metadata.insert("foo", "bar".parse().unwrap());
686 metadata.insert("k1", "v0".parse().unwrap());
687
688 let result =
689 super::merge_metadata_with_headers_from_env(metadata, headers_from_env.0);
690
691 assert_eq!(
692 result.get("foo").unwrap(),
693 MetadataValue::from_static("bar")
694 );
695 assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
696 assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
697 },
698 );
699 }
700
701 #[test]
702 fn test_priority_of_signal_env_over_generic_env_for_endpoint() {
703 run_env_test(
704 vec![
705 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://localhost:1234"),
706 (super::OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:2345"),
707 ],
708 || {
709 let url = TonicExporterBuilder::resolve_endpoint(
710 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
711 None,
712 );
713 assert_eq!(url, "http://localhost:1234");
714 },
715 );
716 }
717
718 #[test]
719 fn test_priority_of_code_based_config_over_envs_for_endpoint() {
720 run_env_test(
721 vec![
722 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://localhost:1234"),
723 (super::OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:2345"),
724 ],
725 || {
726 let url = TonicExporterBuilder::resolve_endpoint(
727 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
728 Some("http://localhost:3456".to_string()),
729 );
730 assert_eq!(url, "http://localhost:3456");
731 },
732 );
733 }
734
735 #[test]
736 fn test_use_default_when_others_missing_for_endpoint() {
737 run_env_test(vec![], || {
738 let url =
739 TonicExporterBuilder::resolve_endpoint(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, None);
740 assert_eq!(url, "http://localhost:4317");
741 });
742 }
743
744 #[test]
745 fn test_use_default_when_empty_string_for_option() {
746 run_env_test(vec![], || {
747 let url = TonicExporterBuilder::resolve_endpoint(
748 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
749 Some(String::new()),
750 );
751 assert_eq!(url, "http://localhost:4317");
752 });
753 }
754}