1use std::env;
2use std::fmt::{Debug, Formatter};
3use std::str::FromStr;
4
5use http::{HeaderMap, HeaderName, HeaderValue};
6use opentelemetry::otel_debug;
7use tonic::codec::CompressionEncoding;
8use tonic::metadata::{KeyAndValueRef, MetadataMap};
9use tonic::service::Interceptor;
10use tonic::transport::Channel;
11#[cfg(feature = "tls")]
12use tonic::transport::ClientTlsConfig;
13
14use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT};
15use super::{resolve_timeout, ExporterBuildError};
16use crate::exporter::Compression;
17use crate::{
18 ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
19 OTEL_EXPORTER_OTLP_HEADERS,
20};
21
22#[cfg(feature = "logs")]
23pub(crate) mod logs;
24
25#[cfg(feature = "metrics")]
26pub(crate) mod metrics;
27
28#[cfg(feature = "trace")]
29pub(crate) mod trace;
30
31#[derive(Debug, Default)]
35#[non_exhaustive]
36pub struct TonicConfig {
37 pub(crate) metadata: Option<MetadataMap>,
39 #[cfg(feature = "tls")]
41 pub(crate) tls_config: Option<ClientTlsConfig>,
42 pub(crate) compression: Option<Compression>,
44 pub(crate) channel: Option<tonic::transport::Channel>,
45 pub(crate) interceptor: Option<BoxInterceptor>,
46}
47
48impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
49 type Error = ExporterBuildError;
50
51 fn try_from(value: Compression) -> Result<Self, ExporterBuildError> {
52 match value {
53 #[cfg(feature = "gzip-tonic")]
54 Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
55 #[cfg(not(feature = "gzip-tonic"))]
56 Compression::Gzip => Err(ExporterBuildError::FeatureRequiredForCompressionAlgorithm(
57 "gzip-tonic",
58 Compression::Gzip,
59 )),
60 #[cfg(feature = "zstd-tonic")]
61 Compression::Zstd => Ok(tonic::codec::CompressionEncoding::Zstd),
62 #[cfg(not(feature = "zstd-tonic"))]
63 Compression::Zstd => Err(ExporterBuildError::FeatureRequiredForCompressionAlgorithm(
64 "zstd-tonic",
65 Compression::Zstd,
66 )),
67 }
68 }
69}
70
71#[derive(Debug)]
106pub struct TonicExporterBuilder {
107 pub(crate) tonic_config: TonicConfig,
108 pub(crate) exporter_config: ExportConfig,
109}
110
111pub(crate) struct BoxInterceptor(Box<dyn Interceptor + Send + Sync>);
112impl tonic::service::Interceptor for BoxInterceptor {
113 fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
114 self.0.call(request)
115 }
116}
117
118impl Debug for BoxInterceptor {
119 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120 write!(f, "BoxInterceptor(..)")
121 }
122}
123
124impl Default for TonicExporterBuilder {
125 fn default() -> Self {
126 TonicExporterBuilder {
127 tonic_config: TonicConfig {
128 metadata: Some(MetadataMap::from_headers(
129 (&default_headers())
130 .try_into()
131 .expect("Invalid tonic headers"),
132 )),
133 #[cfg(feature = "tls")]
134 tls_config: None,
135 compression: None,
136 channel: Option::default(),
137 interceptor: Option::default(),
138 },
139 exporter_config: ExportConfig {
140 protocol: crate::Protocol::Grpc,
141 ..Default::default()
142 },
143 }
144 }
145}
146
147impl TonicExporterBuilder {
148 #[allow(unused)]
150 fn build_channel(
151 self,
152 signal_endpoint_var: &str,
153 signal_timeout_var: &str,
154 signal_compression_var: &str,
155 signal_headers_var: &str,
156 ) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), ExporterBuildError> {
157 let compression = self.resolve_compression(signal_compression_var)?;
158
159 let (headers_from_env, headers_for_logging) = parse_headers_from_env(signal_headers_var);
160 let metadata = merge_metadata_with_headers_from_env(
161 self.tonic_config.metadata.unwrap_or_default(),
162 headers_from_env,
163 );
164
165 let add_metadata = move |mut req: tonic::Request<()>| {
166 for key_and_value in metadata.iter() {
167 match key_and_value {
168 KeyAndValueRef::Ascii(key, value) => {
169 req.metadata_mut().append(key, value.to_owned())
170 }
171 KeyAndValueRef::Binary(key, value) => {
172 req.metadata_mut().append_bin(key, value.to_owned())
173 }
174 };
175 }
176
177 Ok(req)
178 };
179
180 let interceptor = match self.tonic_config.interceptor {
181 Some(mut interceptor) => {
182 BoxInterceptor(Box::new(move |req| interceptor.call(add_metadata(req)?)))
183 }
184 None => BoxInterceptor(Box::new(add_metadata)),
185 };
186
187 if let Some(channel) = self.tonic_config.channel {
189 return Ok((channel, interceptor, compression));
190 }
191
192 let config = self.exporter_config;
193
194 let endpoint = Self::resolve_endpoint(signal_endpoint_var, config.endpoint);
195
196 let endpoint_clone = endpoint.clone();
198
199 let endpoint = Channel::from_shared(endpoint)
200 .map_err(|op| ExporterBuildError::InvalidUri(endpoint_clone.clone(), op.to_string()))?;
201 let timeout = resolve_timeout(signal_timeout_var, config.timeout.as_ref());
202
203 #[cfg(feature = "tls")]
204 let channel = match self.tonic_config.tls_config {
205 Some(tls_config) => endpoint
206 .tls_config(tls_config)
207 .map_err(|er| ExporterBuildError::InternalFailure(er.to_string()))?,
208 None => endpoint,
209 }
210 .timeout(timeout)
211 .connect_lazy();
212
213 #[cfg(not(feature = "tls"))]
214 let channel = endpoint.timeout(timeout).connect_lazy();
215
216 otel_debug!(name: "TonicChannelBuilt", endpoint = endpoint_clone, timeout_in_millisecs = timeout.as_millis(), compression = format!("{:?}", compression), headers = format!("{:?}", headers_for_logging));
217 Ok((channel, interceptor, compression))
218 }
219
220 fn resolve_endpoint(default_endpoint_var: &str, provided_endpoint: Option<String>) -> String {
221 if let Some(endpoint) = provided_endpoint {
229 endpoint
230 } else if let Ok(endpoint) = env::var(default_endpoint_var) {
231 endpoint
232 } else if let Ok(endpoint) = env::var(OTEL_EXPORTER_OTLP_ENDPOINT) {
233 endpoint
234 } else {
235 OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT.to_string()
236 }
237 }
238
239 fn resolve_compression(
240 &self,
241 env_override: &str,
242 ) -> Result<Option<CompressionEncoding>, ExporterBuildError> {
243 if let Some(compression) = self.tonic_config.compression {
244 Ok(Some(compression.try_into()?))
245 } else if let Ok(compression) = env::var(env_override) {
246 Ok(Some(compression.parse::<Compression>()?.try_into()?))
247 } else if let Ok(compression) = env::var(OTEL_EXPORTER_OTLP_COMPRESSION) {
248 Ok(Some(compression.parse::<Compression>()?.try_into()?))
249 } else {
250 Ok(None)
251 }
252 }
253
254 #[cfg(feature = "logs")]
256 pub(crate) fn build_log_exporter(self) -> Result<crate::logs::LogExporter, ExporterBuildError> {
257 use crate::exporter::tonic::logs::TonicLogsClient;
258
259 otel_debug!(name: "LogsTonicChannelBuilding");
260
261 let (channel, interceptor, compression) = self.build_channel(
262 crate::logs::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
263 crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
264 crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
265 crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
266 )?;
267
268 let client = TonicLogsClient::new(channel, interceptor, compression);
269
270 Ok(crate::logs::LogExporter::from_tonic(client))
271 }
272
273 #[cfg(feature = "metrics")]
275 pub(crate) fn build_metrics_exporter(
276 self,
277 temporality: opentelemetry_sdk::metrics::Temporality,
278 ) -> Result<crate::MetricExporter, ExporterBuildError> {
279 use crate::MetricExporter;
280 use metrics::TonicMetricsClient;
281
282 otel_debug!(name: "MetricsTonicChannelBuilding");
283
284 let (channel, interceptor, compression) = self.build_channel(
285 crate::metric::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
286 crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
287 crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
288 crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
289 )?;
290
291 let client = TonicMetricsClient::new(channel, interceptor, compression);
292
293 Ok(MetricExporter::from_tonic(client, temporality))
294 }
295
296 #[cfg(feature = "trace")]
298 pub(crate) fn build_span_exporter(self) -> Result<crate::SpanExporter, ExporterBuildError> {
299 use crate::exporter::tonic::trace::TonicTracesClient;
300
301 otel_debug!(name: "TracesTonicChannelBuilding");
302
303 let (channel, interceptor, compression) = self.build_channel(
304 crate::span::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
305 crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
306 crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
307 crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
308 )?;
309
310 let client = TonicTracesClient::new(channel, interceptor, compression);
311
312 Ok(crate::SpanExporter::from_tonic(client))
313 }
314}
315
316fn merge_metadata_with_headers_from_env(
317 metadata: MetadataMap,
318 headers_from_env: HeaderMap,
319) -> MetadataMap {
320 if headers_from_env.is_empty() {
321 metadata
322 } else {
323 let mut existing_headers: HeaderMap = metadata.into_headers();
324 existing_headers.extend(headers_from_env);
325
326 MetadataMap::from_headers(existing_headers)
327 }
328}
329
330fn parse_headers_from_env(signal_headers_var: &str) -> (HeaderMap, Vec<(String, String)>) {
331 let mut headers = Vec::new();
332
333 (
334 env::var(signal_headers_var)
335 .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
336 .map(|input| {
337 parse_header_string(&input)
338 .filter_map(|(key, value)| {
339 headers.push((key.to_owned(), value.clone()));
340 Some((
341 HeaderName::from_str(key).ok()?,
342 HeaderValue::from_str(&value).ok()?,
343 ))
344 })
345 .collect::<HeaderMap>()
346 })
347 .unwrap_or_default(),
348 headers,
349 )
350}
351
352pub trait HasTonicConfig {
354 fn tonic_config(&mut self) -> &mut TonicConfig;
356}
357
358impl HasTonicConfig for TonicExporterBuilder {
360 fn tonic_config(&mut self) -> &mut TonicConfig {
361 &mut self.tonic_config
362 }
363}
364
365pub trait WithTonicConfig {
380 #[cfg(feature = "tls")]
382 fn with_tls_config(self, tls_config: ClientTlsConfig) -> Self;
383
384 fn with_metadata(self, metadata: MetadataMap) -> Self;
386
387 fn with_compression(self, compression: Compression) -> Self;
389
390 fn with_channel(self, channel: tonic::transport::Channel) -> Self;
397
398 fn with_interceptor<I>(self, interceptor: I) -> Self
402 where
403 I: tonic::service::Interceptor + Clone + Send + Sync + 'static;
404}
405
406impl<B: HasTonicConfig> WithTonicConfig for B {
407 #[cfg(feature = "tls")]
408 fn with_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
409 self.tonic_config().tls_config = Some(tls_config);
410 self
411 }
412
413 fn with_metadata(mut self, metadata: MetadataMap) -> Self {
415 let mut existing_headers = self
417 .tonic_config()
418 .metadata
419 .clone()
420 .unwrap_or_default()
421 .into_headers();
422 existing_headers.extend(metadata.into_headers());
423
424 self.tonic_config().metadata = Some(MetadataMap::from_headers(existing_headers));
425 self
426 }
427
428 fn with_compression(mut self, compression: Compression) -> Self {
429 self.tonic_config().compression = Some(compression);
430 self
431 }
432
433 fn with_channel(mut self, channel: tonic::transport::Channel) -> Self {
434 self.tonic_config().channel = Some(channel);
435 self
436 }
437
438 fn with_interceptor<I>(mut self, interceptor: I) -> Self
439 where
440 I: tonic::service::Interceptor + Clone + Send + Sync + 'static,
441 {
442 self.tonic_config().interceptor = Some(BoxInterceptor(Box::new(interceptor)));
443 self
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use crate::exporter::tests::run_env_test;
450 use crate::exporter::tonic::WithTonicConfig;
451 #[cfg(feature = "grpc-tonic")]
452 use crate::exporter::Compression;
453 use crate::{TonicExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
454 use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
455 use http::{HeaderMap, HeaderName, HeaderValue};
456 use tonic::metadata::{MetadataMap, MetadataValue};
457
458 #[test]
459 fn test_with_metadata() {
460 let mut metadata = MetadataMap::new();
462 metadata.insert("foo", "bar".parse().unwrap());
463 let builder = TonicExporterBuilder::default().with_metadata(metadata);
464 let result = builder.tonic_config.metadata.unwrap();
465 let foo = result
466 .get("foo")
467 .expect("there to always be an entry for foo");
468 assert_eq!(foo, &MetadataValue::try_from("bar").unwrap());
469 assert!(result.get("User-Agent").is_some());
470
471 let mut metadata = MetadataMap::new();
473 metadata.insert("user-agent", "baz".parse().unwrap());
474 let builder = TonicExporterBuilder::default().with_metadata(metadata);
475 let result = builder.tonic_config.metadata.unwrap();
476 assert_eq!(
477 result.get("User-Agent").unwrap(),
478 &MetadataValue::try_from("baz").unwrap()
479 );
480 assert_eq!(
481 result.len(),
482 TonicExporterBuilder::default()
483 .tonic_config
484 .metadata
485 .unwrap()
486 .len()
487 );
488 }
489
490 #[test]
491 #[cfg(feature = "gzip-tonic")]
492 fn test_with_gzip_compression() {
493 let mut metadata = MetadataMap::new();
495 metadata.insert("foo", "bar".parse().unwrap());
496 let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
497 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
498 }
499
500 #[test]
501 #[cfg(feature = "zstd-tonic")]
502 fn test_with_zstd_compression() {
503 let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
504 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Zstd);
505 }
506
507 #[test]
508 fn test_convert_compression() {
509 #[cfg(feature = "gzip-tonic")]
510 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_ok());
511 #[cfg(not(feature = "gzip-tonic"))]
512 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_err());
513 #[cfg(feature = "zstd-tonic")]
514 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_ok());
515 #[cfg(not(feature = "zstd-tonic"))]
516 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_err());
517 }
518
519 #[cfg(feature = "zstd-tonic")]
520 #[test]
521 fn test_priority_of_signal_env_over_generic_env_for_compression() {
522 run_env_test(
523 vec![
524 (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "zstd"),
525 (super::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"),
526 ],
527 || {
528 let builder = TonicExporterBuilder::default();
529
530 let compression = builder
531 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
532 .unwrap();
533 assert_eq!(compression, Some(tonic::codec::CompressionEncoding::Zstd));
534 },
535 );
536 }
537
538 #[cfg(feature = "zstd-tonic")]
539 #[test]
540 fn test_priority_of_code_based_config_over_envs_for_compression() {
541 run_env_test(
542 vec![
543 (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "gzip"),
544 (super::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"),
545 ],
546 || {
547 let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
548
549 let compression = builder
550 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
551 .unwrap();
552 assert_eq!(compression, Some(tonic::codec::CompressionEncoding::Zstd));
553 },
554 );
555 }
556
557 #[test]
558 fn test_use_default_when_others_missing_for_compression() {
559 run_env_test(vec![], || {
560 let builder = TonicExporterBuilder::default();
561
562 let compression = builder
563 .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
564 .unwrap();
565 assert!(compression.is_none());
566 });
567 }
568
569 #[test]
570 fn test_parse_headers_from_env() {
571 run_env_test(
572 vec![
573 (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
574 (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
575 ],
576 || {
577 assert_eq!(
578 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS).0,
579 HeaderMap::from_iter([
580 (
581 HeaderName::from_static("k1"),
582 HeaderValue::from_static("v1")
583 ),
584 (
585 HeaderName::from_static("k2"),
586 HeaderValue::from_static("v2")
587 ),
588 ])
589 );
590
591 assert_eq!(
592 super::parse_headers_from_env("EMPTY_ENV").0,
593 HeaderMap::from_iter([(
594 HeaderName::from_static("k3"),
595 HeaderValue::from_static("v3")
596 )])
597 );
598 },
599 )
600 }
601
602 #[test]
603 fn test_merge_metadata_with_headers_from_env() {
604 run_env_test(
605 vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
606 || {
607 let headers_from_env =
608 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);
609
610 let mut metadata = MetadataMap::new();
611 metadata.insert("foo", "bar".parse().unwrap());
612 metadata.insert("k1", "v0".parse().unwrap());
613
614 let result =
615 super::merge_metadata_with_headers_from_env(metadata, headers_from_env.0);
616
617 assert_eq!(
618 result.get("foo").unwrap(),
619 MetadataValue::from_static("bar")
620 );
621 assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
622 assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
623 },
624 );
625 }
626
627 #[test]
628 fn test_priority_of_signal_env_over_generic_env_for_endpoint() {
629 run_env_test(
630 vec![
631 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://localhost:1234"),
632 (super::OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:2345"),
633 ],
634 || {
635 let url = TonicExporterBuilder::resolve_endpoint(
636 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
637 None,
638 );
639 assert_eq!(url, "http://localhost:1234");
640 },
641 );
642 }
643
644 #[test]
645 fn test_priority_of_code_based_config_over_envs_for_endpoint() {
646 run_env_test(
647 vec![
648 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://localhost:1234"),
649 (super::OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:2345"),
650 ],
651 || {
652 let url = TonicExporterBuilder::resolve_endpoint(
653 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
654 Some("http://localhost:3456".to_string()),
655 );
656 assert_eq!(url, "http://localhost:3456");
657 },
658 );
659 }
660
661 #[test]
662 fn test_use_default_when_others_missing_for_endpoint() {
663 run_env_test(vec![], || {
664 let url =
665 TonicExporterBuilder::resolve_endpoint(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, None);
666 assert_eq!(url, "http://localhost:4317");
667 });
668 }
669}