1use std::env;
2use std::fmt::{Debug, Formatter};
3use std::str::FromStr;
4
5use http::{HeaderMap, HeaderName, HeaderValue};
6use opentelemetry::otel_debug;
7use tonic::codec::CompressionEncoding;
8use tonic::metadata::{KeyAndValueRef, MetadataMap};
9use tonic::service::Interceptor;
10use tonic::transport::Channel;
11#[cfg(feature = "tls")]
12use tonic::transport::ClientTlsConfig;
13
14use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT};
15use super::{resolve_timeout, ExporterBuildError};
16use crate::exporter::Compression;
17use crate::{ExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
18
19#[cfg(feature = "logs")]
20pub(crate) mod logs;
21
22#[cfg(feature = "metrics")]
23pub(crate) mod metrics;
24
25#[cfg(feature = "trace")]
26pub(crate) mod trace;
27
28#[derive(Debug, Default)]
32#[non_exhaustive]
33pub struct TonicConfig {
34    pub(crate) metadata: Option<MetadataMap>,
36    #[cfg(feature = "tls")]
38    pub(crate) tls_config: Option<ClientTlsConfig>,
39    pub(crate) compression: Option<Compression>,
41    pub(crate) channel: Option<tonic::transport::Channel>,
42    pub(crate) interceptor: Option<BoxInterceptor>,
43}
44
45impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
46    type Error = ExporterBuildError;
47
48    fn try_from(value: Compression) -> Result<Self, ExporterBuildError> {
49        match value {
50            #[cfg(feature = "gzip-tonic")]
51            Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
52            #[cfg(not(feature = "gzip-tonic"))]
53            Compression::Gzip => Err(ExporterBuildError::FeatureRequiredForCompressionAlgorithm(
54                "gzip-tonic",
55                Compression::Gzip,
56            )),
57            #[cfg(feature = "zstd-tonic")]
58            Compression::Zstd => Ok(tonic::codec::CompressionEncoding::Zstd),
59            #[cfg(not(feature = "zstd-tonic"))]
60            Compression::Zstd => Err(ExporterBuildError::FeatureRequiredForCompressionAlgorithm(
61                "zstd-tonic",
62                Compression::Zstd,
63            )),
64        }
65    }
66}
67
68#[derive(Debug)]
103pub struct TonicExporterBuilder {
104    pub(crate) tonic_config: TonicConfig,
105    pub(crate) exporter_config: ExportConfig,
106}
107
108pub(crate) struct BoxInterceptor(Box<dyn Interceptor + Send + Sync>);
109impl tonic::service::Interceptor for BoxInterceptor {
110    fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
111        self.0.call(request)
112    }
113}
114
115impl Debug for BoxInterceptor {
116    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
117        write!(f, "BoxInterceptor(..)")
118    }
119}
120
121impl Default for TonicExporterBuilder {
122    fn default() -> Self {
123        TonicExporterBuilder {
124            tonic_config: TonicConfig {
125                metadata: Some(MetadataMap::from_headers(
126                    (&default_headers())
127                        .try_into()
128                        .expect("Invalid tonic headers"),
129                )),
130                #[cfg(feature = "tls")]
131                tls_config: None,
132                compression: None,
133                channel: Option::default(),
134                interceptor: Option::default(),
135            },
136            exporter_config: ExportConfig {
137                protocol: crate::Protocol::Grpc,
138                ..Default::default()
139            },
140        }
141    }
142}
143
144impl TonicExporterBuilder {
145    #[allow(unused)]
147    fn build_channel(
148        self,
149        signal_endpoint_var: &str,
150        signal_timeout_var: &str,
151        signal_compression_var: &str,
152        signal_headers_var: &str,
153    ) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), ExporterBuildError> {
154        let compression = self.resolve_compression(signal_compression_var)?;
155
156        let (headers_from_env, headers_for_logging) = parse_headers_from_env(signal_headers_var);
157        let metadata = merge_metadata_with_headers_from_env(
158            self.tonic_config.metadata.unwrap_or_default(),
159            headers_from_env,
160        );
161
162        let add_metadata = move |mut req: tonic::Request<()>| {
163            for key_and_value in metadata.iter() {
164                match key_and_value {
165                    KeyAndValueRef::Ascii(key, value) => {
166                        req.metadata_mut().append(key, value.to_owned())
167                    }
168                    KeyAndValueRef::Binary(key, value) => {
169                        req.metadata_mut().append_bin(key, value.to_owned())
170                    }
171                };
172            }
173
174            Ok(req)
175        };
176
177        let interceptor = match self.tonic_config.interceptor {
178            Some(mut interceptor) => {
179                BoxInterceptor(Box::new(move |req| interceptor.call(add_metadata(req)?)))
180            }
181            None => BoxInterceptor(Box::new(add_metadata)),
182        };
183
184        if let Some(channel) = self.tonic_config.channel {
186            return Ok((channel, interceptor, compression));
187        }
188
189        let config = self.exporter_config;
190
191        let endpoint = Self::resolve_endpoint(signal_endpoint_var, config.endpoint);
192
193        let endpoint_clone = endpoint.clone();
195
196        let endpoint = Channel::from_shared(endpoint)
197            .map_err(|op| ExporterBuildError::InvalidUri(endpoint_clone.clone(), op.to_string()))?;
198        let timeout = resolve_timeout(signal_timeout_var, config.timeout.as_ref());
199
200        #[cfg(feature = "tls")]
201        let channel = match self.tonic_config.tls_config {
202            Some(tls_config) => endpoint
203                .tls_config(tls_config)
204                .map_err(|er| ExporterBuildError::InternalFailure(er.to_string()))?,
205            None => endpoint,
206        }
207        .timeout(timeout)
208        .connect_lazy();
209
210        #[cfg(not(feature = "tls"))]
211        let channel = endpoint.timeout(timeout).connect_lazy();
212
213        otel_debug!(name: "TonicChannelBuilt", endpoint = endpoint_clone, timeout_in_millisecs = timeout.as_millis(), compression = format!("{:?}", compression), headers = format!("{:?}", headers_for_logging));
214        Ok((channel, interceptor, compression))
215    }
216
217    fn resolve_endpoint(default_endpoint_var: &str, provided_endpoint: Option<String>) -> String {
218        if let Some(endpoint) = provided_endpoint.filter(|s| !s.is_empty()) {
226            endpoint
227        } else if let Ok(endpoint) = env::var(default_endpoint_var) {
228            endpoint
229        } else if let Ok(endpoint) = env::var(OTEL_EXPORTER_OTLP_ENDPOINT) {
230            endpoint
231        } else {
232            OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT.to_string()
233        }
234    }
235
236    fn resolve_compression(
237        &self,
238        env_override: &str,
239    ) -> Result<Option<CompressionEncoding>, ExporterBuildError> {
240        super::resolve_compression_from_env(self.tonic_config.compression, env_override)?
241            .map(|c| c.try_into())
242            .transpose()
243    }
244
245    #[cfg(feature = "logs")]
247    pub(crate) fn build_log_exporter(self) -> Result<crate::logs::LogExporter, ExporterBuildError> {
248        use crate::exporter::tonic::logs::TonicLogsClient;
249
250        otel_debug!(name: "LogsTonicChannelBuilding");
251
252        let (channel, interceptor, compression) = self.build_channel(
253            crate::logs::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
254            crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
255            crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
256            crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
257        )?;
258
259        let client = TonicLogsClient::new(channel, interceptor, compression);
260
261        Ok(crate::logs::LogExporter::from_tonic(client))
262    }
263
264    #[cfg(feature = "metrics")]
266    pub(crate) fn build_metrics_exporter(
267        self,
268        temporality: opentelemetry_sdk::metrics::Temporality,
269    ) -> Result<crate::MetricExporter, ExporterBuildError> {
270        use crate::MetricExporter;
271        use metrics::TonicMetricsClient;
272
273        otel_debug!(name: "MetricsTonicChannelBuilding");
274
275        let (channel, interceptor, compression) = self.build_channel(
276            crate::metric::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
277            crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
278            crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
279            crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
280        )?;
281
282        let client = TonicMetricsClient::new(channel, interceptor, compression);
283
284        Ok(MetricExporter::from_tonic(client, temporality))
285    }
286
287    #[cfg(feature = "trace")]
289    pub(crate) fn build_span_exporter(self) -> Result<crate::SpanExporter, ExporterBuildError> {
290        use crate::exporter::tonic::trace::TonicTracesClient;
291
292        otel_debug!(name: "TracesTonicChannelBuilding");
293
294        let (channel, interceptor, compression) = self.build_channel(
295            crate::span::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
296            crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
297            crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
298            crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
299        )?;
300
301        let client = TonicTracesClient::new(channel, interceptor, compression);
302
303        Ok(crate::SpanExporter::from_tonic(client))
304    }
305}
306
307fn merge_metadata_with_headers_from_env(
308    metadata: MetadataMap,
309    headers_from_env: HeaderMap,
310) -> MetadataMap {
311    if headers_from_env.is_empty() {
312        metadata
313    } else {
314        let mut existing_headers: HeaderMap = metadata.into_headers();
315        existing_headers.extend(headers_from_env);
316
317        MetadataMap::from_headers(existing_headers)
318    }
319}
320
321fn parse_headers_from_env(signal_headers_var: &str) -> (HeaderMap, Vec<(String, String)>) {
322    let mut headers = Vec::new();
323
324    (
325        env::var(signal_headers_var)
326            .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
327            .map(|input| {
328                parse_header_string(&input)
329                    .filter_map(|(key, value)| {
330                        headers.push((key.to_owned(), value.clone()));
331                        Some((
332                            HeaderName::from_str(key).ok()?,
333                            HeaderValue::from_str(&value).ok()?,
334                        ))
335                    })
336                    .collect::<HeaderMap>()
337            })
338            .unwrap_or_default(),
339        headers,
340    )
341}
342
343pub trait HasTonicConfig {
345    fn tonic_config(&mut self) -> &mut TonicConfig;
347}
348
349impl HasTonicConfig for TonicExporterBuilder {
351    fn tonic_config(&mut self) -> &mut TonicConfig {
352        &mut self.tonic_config
353    }
354}
355
356pub trait WithTonicConfig {
371    #[cfg(feature = "tls")]
373    fn with_tls_config(self, tls_config: ClientTlsConfig) -> Self;
374
375    fn with_metadata(self, metadata: MetadataMap) -> Self;
403
404    fn with_compression(self, compression: Compression) -> Self;
406
407    fn with_channel(self, channel: tonic::transport::Channel) -> Self;
414
415    fn with_interceptor<I>(self, interceptor: I) -> Self
476    where
477        I: tonic::service::Interceptor + Clone + Send + Sync + 'static;
478}
479
480impl<B: HasTonicConfig> WithTonicConfig for B {
481    #[cfg(feature = "tls")]
482    fn with_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
483        self.tonic_config().tls_config = Some(tls_config);
484        self
485    }
486
487    fn with_metadata(mut self, metadata: MetadataMap) -> Self {
489        let mut existing_headers = self
491            .tonic_config()
492            .metadata
493            .clone()
494            .unwrap_or_default()
495            .into_headers();
496        existing_headers.extend(metadata.into_headers());
497
498        self.tonic_config().metadata = Some(MetadataMap::from_headers(existing_headers));
499        self
500    }
501
502    fn with_compression(mut self, compression: Compression) -> Self {
503        self.tonic_config().compression = Some(compression);
504        self
505    }
506
507    fn with_channel(mut self, channel: tonic::transport::Channel) -> Self {
508        self.tonic_config().channel = Some(channel);
509        self
510    }
511
512    fn with_interceptor<I>(mut self, interceptor: I) -> Self
513    where
514        I: tonic::service::Interceptor + Clone + Send + Sync + 'static,
515    {
516        self.tonic_config().interceptor = Some(BoxInterceptor(Box::new(interceptor)));
517        self
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use crate::exporter::tests::run_env_test;
524    use crate::exporter::tonic::WithTonicConfig;
525    #[cfg(feature = "grpc-tonic")]
526    use crate::exporter::Compression;
527    use crate::{TonicExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
528    use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
529    use http::{HeaderMap, HeaderName, HeaderValue};
530    use tonic::metadata::{MetadataMap, MetadataValue};
531
532    #[test]
533    fn test_with_metadata() {
534        let mut metadata = MetadataMap::new();
536        metadata.insert("foo", "bar".parse().unwrap());
537        let builder = TonicExporterBuilder::default().with_metadata(metadata);
538        let result = builder.tonic_config.metadata.unwrap();
539        let foo = result
540            .get("foo")
541            .expect("there to always be an entry for foo");
542        assert_eq!(foo, &MetadataValue::try_from("bar").unwrap());
543        assert!(result.get("User-Agent").is_some());
544
545        let mut metadata = MetadataMap::new();
547        metadata.insert("user-agent", "baz".parse().unwrap());
548        let builder = TonicExporterBuilder::default().with_metadata(metadata);
549        let result = builder.tonic_config.metadata.unwrap();
550        assert_eq!(
551            result.get("User-Agent").unwrap(),
552            &MetadataValue::try_from("baz").unwrap()
553        );
554        assert_eq!(
555            result.len(),
556            TonicExporterBuilder::default()
557                .tonic_config
558                .metadata
559                .unwrap()
560                .len()
561        );
562    }
563
564    #[test]
565    #[cfg(feature = "gzip-tonic")]
566    fn test_with_gzip_compression() {
567        let mut metadata = MetadataMap::new();
569        metadata.insert("foo", "bar".parse().unwrap());
570        let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
571        assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
572    }
573
574    #[test]
575    #[cfg(feature = "zstd-tonic")]
576    fn test_with_zstd_compression() {
577        let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
578        assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Zstd);
579    }
580
581    #[test]
582    fn test_convert_compression() {
583        #[cfg(feature = "gzip-tonic")]
584        assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_ok());
585        #[cfg(not(feature = "gzip-tonic"))]
586        assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_err());
587        #[cfg(feature = "zstd-tonic")]
588        assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_ok());
589        #[cfg(not(feature = "zstd-tonic"))]
590        assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_err());
591    }
592
593    #[cfg(feature = "zstd-tonic")]
594    #[test]
595    fn test_priority_of_signal_env_over_generic_env_for_compression() {
596        run_env_test(
597            vec![
598                (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "zstd"),
599                (crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"),
600            ],
601            || {
602                let builder = TonicExporterBuilder::default();
603
604                let compression = builder
605                    .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
606                    .unwrap();
607                assert_eq!(compression, Some(tonic::codec::CompressionEncoding::Zstd));
608            },
609        );
610    }
611
612    #[cfg(feature = "zstd-tonic")]
613    #[test]
614    fn test_priority_of_code_based_config_over_envs_for_compression() {
615        run_env_test(
616            vec![
617                (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "gzip"),
618                (crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"),
619            ],
620            || {
621                let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
622
623                let compression = builder
624                    .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
625                    .unwrap();
626                assert_eq!(compression, Some(tonic::codec::CompressionEncoding::Zstd));
627            },
628        );
629    }
630
631    #[test]
632    fn test_use_default_when_others_missing_for_compression() {
633        run_env_test(vec![], || {
634            let builder = TonicExporterBuilder::default();
635
636            let compression = builder
637                .resolve_compression(crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
638                .unwrap();
639            assert!(compression.is_none());
640        });
641    }
642
643    #[test]
644    fn test_parse_headers_from_env() {
645        run_env_test(
646            vec![
647                (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
648                (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
649            ],
650            || {
651                assert_eq!(
652                    super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS).0,
653                    HeaderMap::from_iter([
654                        (
655                            HeaderName::from_static("k1"),
656                            HeaderValue::from_static("v1")
657                        ),
658                        (
659                            HeaderName::from_static("k2"),
660                            HeaderValue::from_static("v2")
661                        ),
662                    ])
663                );
664
665                assert_eq!(
666                    super::parse_headers_from_env("EMPTY_ENV").0,
667                    HeaderMap::from_iter([(
668                        HeaderName::from_static("k3"),
669                        HeaderValue::from_static("v3")
670                    )])
671                );
672            },
673        )
674    }
675
676    #[test]
677    fn test_merge_metadata_with_headers_from_env() {
678        run_env_test(
679            vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
680            || {
681                let headers_from_env =
682                    super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);
683
684                let mut metadata = MetadataMap::new();
685                metadata.insert("foo", "bar".parse().unwrap());
686                metadata.insert("k1", "v0".parse().unwrap());
687
688                let result =
689                    super::merge_metadata_with_headers_from_env(metadata, headers_from_env.0);
690
691                assert_eq!(
692                    result.get("foo").unwrap(),
693                    MetadataValue::from_static("bar")
694                );
695                assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
696                assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
697            },
698        );
699    }
700
701    #[test]
702    fn test_priority_of_signal_env_over_generic_env_for_endpoint() {
703        run_env_test(
704            vec![
705                (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://localhost:1234"),
706                (super::OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:2345"),
707            ],
708            || {
709                let url = TonicExporterBuilder::resolve_endpoint(
710                    OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
711                    None,
712                );
713                assert_eq!(url, "http://localhost:1234");
714            },
715        );
716    }
717
718    #[test]
719    fn test_priority_of_code_based_config_over_envs_for_endpoint() {
720        run_env_test(
721            vec![
722                (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://localhost:1234"),
723                (super::OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:2345"),
724            ],
725            || {
726                let url = TonicExporterBuilder::resolve_endpoint(
727                    OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
728                    Some("http://localhost:3456".to_string()),
729                );
730                assert_eq!(url, "http://localhost:3456");
731            },
732        );
733    }
734
735    #[test]
736    fn test_use_default_when_others_missing_for_endpoint() {
737        run_env_test(vec![], || {
738            let url =
739                TonicExporterBuilder::resolve_endpoint(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, None);
740            assert_eq!(url, "http://localhost:4317");
741        });
742    }
743
744    #[test]
745    fn test_use_default_when_empty_string_for_option() {
746        run_env_test(vec![], || {
747            let url = TonicExporterBuilder::resolve_endpoint(
748                OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
749                Some(String::new()),
750            );
751            assert_eq!(url, "http://localhost:4317");
752        });
753    }
754}