telemetry_rust/
otlp.rs

1// Originally retired from davidB/tracing-opentelemetry-instrumentation-sdk
2// which is licensed under CC0 1.0 Universal
3// https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk/blob/d3609ac2cc699d3a24fbf89754053cc8e938e3bf/LICENSE
4
5use std::{collections::HashMap, str::FromStr};
6
7use opentelemetry::trace::TraceError;
8use opentelemetry_http::hyper::HyperClient;
9use opentelemetry_otlp::{
10    ExportConfig, Protocol, SpanExporter, WithExportConfig, WithHttpConfig,
11};
12use opentelemetry_sdk::{
13    runtime,
14    trace::{Sampler, TracerProvider},
15    Resource,
16};
17use std::time::Duration;
18
19pub use crate::filter::read_tracing_level_from_env as read_otel_log_level_from_env;
20use crate::util;
21
22#[must_use]
23pub fn identity(
24    v: opentelemetry_sdk::trace::Builder,
25) -> opentelemetry_sdk::trace::Builder {
26    v
27}
28
29// see https://opentelemetry.io/docs/reference/specification/protocol/exporter/
30pub fn init_tracer<F>(
31    resource: Resource,
32    transform: F,
33) -> Result<TracerProvider, TraceError>
34where
35    F: FnOnce(opentelemetry_sdk::trace::Builder) -> opentelemetry_sdk::trace::Builder,
36{
37    let (maybe_protocol, maybe_endpoint, maybe_timeout) = read_export_config_from_env();
38    let export_config = infer_export_config(
39        maybe_protocol.as_deref(),
40        maybe_endpoint.as_deref(),
41        maybe_timeout.as_deref(),
42    )?;
43    tracing::debug!(target: "otel::setup", export_config = format!("{export_config:?}"));
44    let exporter: SpanExporter = match export_config.protocol {
45        Protocol::HttpBinary => SpanExporter::builder()
46            .with_http()
47            .with_http_client(HyperClient::with_default_connector(
48                export_config.timeout,
49                None,
50            ))
51            .with_headers(read_headers_from_env())
52            .with_export_config(export_config)
53            .build()?,
54        Protocol::Grpc => SpanExporter::builder()
55            .with_tonic()
56            .with_export_config(export_config)
57            .build()?,
58        Protocol::HttpJson => unreachable!("HttpJson protocol is not supported"),
59    };
60
61    let tracer_provider_builder = TracerProvider::builder()
62        .with_batch_exporter(exporter, runtime::Tokio)
63        .with_resource(resource)
64        .with_sampler(read_sampler_from_env());
65
66    Ok(transform(tracer_provider_builder).build())
67}
68
69/// turn a string of "k1=v1,k2=v2,..." into an iterator of (key, value) tuples
70fn parse_headers(val: &str) -> impl Iterator<Item = (String, String)> + '_ {
71    val.split(',').filter_map(|kv| {
72        let s = kv
73            .split_once('=')
74            .map(|(k, v)| (k.to_owned(), v.to_owned()));
75        s
76    })
77}
78fn read_headers_from_env() -> HashMap<String, String> {
79    let mut headers = HashMap::new();
80    headers.extend(parse_headers(
81        &util::env_var("OTEL_EXPORTER_OTLP_HEADERS").unwrap_or_default(),
82    ));
83    headers.extend(parse_headers(
84        &util::env_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS").unwrap_or_default(),
85    ));
86    headers
87}
88fn read_export_config_from_env() -> (Option<String>, Option<String>, Option<String>) {
89    let maybe_endpoint = util::env_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
90        .or_else(|| util::env_var("OTEL_EXPORTER_OTLP_ENDPOINT"));
91    let maybe_protocol = util::env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL")
92        .or_else(|| util::env_var("OTEL_EXPORTER_OTLP_PROTOCOL"));
93    let maybe_timeout = util::env_var("OTEL_EXPORTER_OTLP_TRACES_TIMEOUT")
94        .or_else(|| util::env_var("OTEL_EXPORTER_OTLP_TIMEOUT"));
95    (maybe_protocol, maybe_endpoint, maybe_timeout)
96}
97
98/// see <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables/#general-sdk-configuration>
99/// TODO log error and infered sampler
100fn read_sampler_from_env() -> Sampler {
101    let mut name = util::env_var("OTEL_TRACES_SAMPLER")
102        .unwrap_or_default()
103        .to_lowercase();
104    let v = match name.as_str() {
105        "always_on" => Sampler::AlwaysOn,
106        "always_off" => Sampler::AlwaysOff,
107        "traceidratio" => Sampler::TraceIdRatioBased(read_sampler_arg_from_env(1f64)),
108        "parentbased_always_on" => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
109        "parentbased_always_off" => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
110        "parentbased_traceidratio" => Sampler::ParentBased(Box::new(
111            Sampler::TraceIdRatioBased(read_sampler_arg_from_env(1f64)),
112        )),
113        "jaeger_remote" => todo!("unsupported: OTEL_TRACES_SAMPLER='jaeger_remote'"),
114        "xray" => todo!("unsupported: OTEL_TRACES_SAMPLER='xray'"),
115        _ => {
116            name = "parentbased_always_on".to_string();
117            Sampler::ParentBased(Box::new(Sampler::AlwaysOn))
118        }
119    };
120    tracing::debug!(target: "otel::setup", OTEL_TRACES_SAMPLER = ?name);
121    v
122}
123
124fn read_sampler_arg_from_env<T>(default: T) -> T
125where
126    T: FromStr + Copy + std::fmt::Debug,
127{
128    //TODO Log for invalid value (how to log)
129    let v = util::env_var("OTEL_TRACES_SAMPLER_ARG")
130        .map_or(default, |s| T::from_str(&s).unwrap_or(default));
131    tracing::debug!(target: "otel::setup", OTEL_TRACES_SAMPLER_ARG = ?v);
132    v
133}
134
135fn infer_export_config(
136    maybe_protocol: Option<&str>,
137    maybe_endpoint: Option<&str>,
138    maybe_timeout: Option<&str>,
139) -> Result<ExportConfig, TraceError> {
140    let protocol = match maybe_protocol {
141        Some("grpc") => Protocol::Grpc,
142        Some("http") | Some("http/protobuf") => Protocol::HttpBinary,
143        Some(other) => {
144            return Err(TraceError::from(format!(
145                "unsupported protocol {other:?} form env"
146            )))
147        }
148        None => match maybe_endpoint {
149            Some(e) if e.contains(":4317") => Protocol::Grpc,
150            _ => Protocol::HttpBinary,
151        },
152    };
153
154    let timeout = match maybe_timeout {
155        Some(millis) => Duration::from_millis(millis.parse::<u64>().map_err(|err| {
156            TraceError::from(format!("invalid timeout {millis:?} form env: {err}"))
157        })?),
158        None => {
159            Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
160        }
161    };
162
163    Ok(ExportConfig {
164        endpoint: maybe_endpoint.map(ToOwned::to_owned),
165        protocol,
166        timeout,
167    })
168}
169
170#[cfg(test)]
171mod tests {
172    use assert2::assert;
173    use rstest::rstest;
174
175    use super::*;
176    use Protocol::*;
177
178    const TIMEOUT: Duration =
179        Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT);
180
181    #[rstest]
182    #[case(None, None, None, HttpBinary, None, TIMEOUT)]
183    #[case(Some("http/protobuf"), None, None, HttpBinary, None, TIMEOUT)]
184    #[case(Some("http"), None, None, HttpBinary, None, TIMEOUT)]
185    #[case(Some("grpc"), None, None, Grpc, None, TIMEOUT)]
186    #[case(
187        None,
188        Some("http://localhost:4317"),
189        None,
190        Grpc,
191        Some("http://localhost:4317"),
192        TIMEOUT
193    )]
194    #[case(
195        Some("http/protobuf"),
196        Some("http://localhost:4318"),
197        None,
198        HttpBinary,
199        Some("http://localhost:4318"),
200        TIMEOUT
201    )]
202    #[case(
203        Some("http/protobuf"),
204        Some("https://examples.com:4318"),
205        None,
206        HttpBinary,
207        Some("https://examples.com:4318"),
208        TIMEOUT
209    )]
210    #[case(
211        Some("http/protobuf"),
212        Some("https://examples.com:4317"),
213        Some("12345"),
214        HttpBinary,
215        Some("https://examples.com:4317"),
216        Duration::from_millis(12345)
217    )]
218    fn test_infer_export_config(
219        #[case] traces_protocol: Option<&str>,
220        #[case] traces_endpoint: Option<&str>,
221        #[case] traces_timeout: Option<&str>,
222        #[case] expected_protocol: Protocol,
223        #[case] expected_endpoint: Option<&str>,
224        #[case] expected_timeout: Duration,
225    ) {
226        let ExportConfig {
227            protocol,
228            endpoint,
229            timeout,
230        } = infer_export_config(traces_protocol, traces_endpoint, traces_timeout)
231            .unwrap();
232
233        assert!(protocol == expected_protocol);
234        assert!(endpoint.as_deref() == expected_endpoint);
235        assert!(timeout == expected_timeout);
236    }
237}