use std::collections::HashMap;
use anyhow::Result;
use opentelemetry::{
propagation::{Extractor, Injector},
trace::TracerProvider,
};
use opentelemetry_otlp::{SpanExporter, WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::{trace as sdktrace, trace::Sampler};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::{
parse_env::{parse_bool_env, parse_key_value_env, parse_string_list_env},
TelemetryHandle,
};
pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4318";
pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
pub struct OtelResult {
pub handle: TelemetryHandle,
pub tracer: Option<sdktrace::SdkTracer>,
}
#[derive(Debug, Clone, PartialEq, Default)]
pub enum Protocol {
Grpc,
#[default]
HttpProtobuf,
HttpJson,
}
#[derive(Debug, Clone, PartialEq)]
pub enum Sampling {
AlwaysOn,
AlwaysOff,
Ratio(f64),
}
impl Default for Sampling {
fn default() -> Self {
Sampling::Ratio(0.05)
}
}
#[derive(Debug, Clone)]
pub struct OtlpConfig {
pub service_name: String,
pub service_version: String,
pub log_level: String,
pub resource_attributes: HashMap<String, String>,
pub enable_console: bool,
pub exporter_endpoint: Option<String>,
pub exporter_endpoint_insecure: bool,
pub traces_exporter_endpoint: Option<String>,
pub traces_exporter_endpoint_insecure: bool,
pub traces_enabled: bool,
pub metrics_exporter_endpoint: Option<String>,
pub metrics_exporter_endpoint_insecure: bool,
pub metrics_enabled: bool,
pub metrics_reporting_period: String,
pub propagators: Vec<String>,
pub exporter_protocol: Protocol,
pub traces_exporter_protocol: Option<Protocol>,
pub metrics_exporter_protocol: Option<Protocol>,
pub headers: HashMap<String, String>,
pub traces_headers: HashMap<String, String>,
pub metrics_headers: HashMap<String, String>,
pub sampling: Sampling,
}
impl Default for OtlpConfig {
fn default() -> Self {
let main_protocol = parse_protocol_env("OTEL_EXPORTER_OTLP_PROTOCOL", Protocol::default());
let traces_protocol = std::env::var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL")
.ok()
.map(|_| {
parse_protocol_env("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", main_protocol.clone())
})
.unwrap_or_else(|| main_protocol.clone());
let default_endpoint = match traces_protocol {
Protocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT,
_ => DEFAULT_OTLP_ENDPOINT,
};
let general_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")
.ok()
.or_else(|| Some(default_endpoint.to_string()));
Self {
service_name: std::env::var("OTEL_SERVICE_NAME")
.unwrap_or_else(|_| "rialo".to_string()),
service_version: std::env::var("OTEL_SERVICE_VERSION")
.unwrap_or_else(|_| "unknown".to_string()),
log_level: std::env::var("OTEL_LOG_LEVEL").unwrap_or_else(|_| "info".to_string()),
resource_attributes: parse_key_value_env("OTEL_RESOURCE_ATTRIBUTES"),
enable_console: true,
exporter_endpoint: general_endpoint.clone(),
exporter_endpoint_insecure: parse_bool_env("OTEL_EXPORTER_OTLP_INSECURE", false),
traces_exporter_endpoint: std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT").ok(),
traces_exporter_endpoint_insecure: parse_bool_env(
"OTEL_EXPORTER_OTLP_TRACES_INSECURE",
false,
),
traces_enabled: parse_bool_env("OTEL_TRACES_ENABLED", true),
metrics_exporter_endpoint: std::env::var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT").ok(),
metrics_exporter_endpoint_insecure: parse_bool_env(
"OTEL_EXPORTER_OTLP_METRICS_INSECURE",
false,
),
metrics_enabled: parse_bool_env("OTEL_METRICS_ENABLED", true),
metrics_reporting_period: std::env::var("OTEL_EXPORTER_OTLP_METRICS_PERIOD")
.unwrap_or_else(|_| "30s".to_string()),
propagators: parse_string_list_env(
"OTEL_PROPAGATORS",
vec!["tracecontext".to_string(), "baggage".to_string()],
),
exporter_protocol: main_protocol.clone(),
traces_exporter_protocol: std::env::var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL")
.ok()
.map(|_| {
parse_protocol_env("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", main_protocol.clone())
}),
metrics_exporter_protocol: std::env::var("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL")
.ok()
.map(|_| parse_protocol_env("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", main_protocol)),
headers: parse_key_value_env("OTEL_EXPORTER_OTLP_HEADERS"),
traces_headers: parse_key_value_env("OTEL_EXPORTER_OTLP_TRACES_HEADERS"),
metrics_headers: parse_key_value_env("OTEL_EXPORTER_OTLP_METRICS_HEADERS"),
sampling: parse_sampler_env(
"OTEL_TRACES_SAMPLER",
"OTEL_TRACES_SAMPLER_ARG",
Sampling::default(),
),
}
}
}
impl OtlpConfig {
pub fn new() -> Self {
Self::default()
}
fn validate_url(url: &str) -> anyhow::Result<()> {
if url.is_empty() {
return Err(anyhow::anyhow!("URL cannot be empty. Provide a valid endpoint URL or omit the configuration to disable tracing."));
}
url.parse::<url::Url>()
.map_err(|_| anyhow::anyhow!("Invalid URL format: '{}'. Please provide a valid URL (e.g., 'http://localhost:4318' or 'https://api.honeycomb.io/v1/traces')", url))?;
Ok(())
}
fn validate_service_name(name: &str) -> anyhow::Result<()> {
if name.is_empty() {
return Err(anyhow::anyhow!("Service name cannot be empty"));
}
if name.len() > 255 {
return Err(anyhow::anyhow!("Service name cannot exceed 255 characters"));
}
if !name
.chars()
.all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.')
{
return Err(anyhow::anyhow!("Service name can only contain alphanumeric characters, hyphens, underscores, and dots"));
}
Ok(())
}
fn validate_header_value(value: &str) -> anyhow::Result<()> {
if !value.is_ascii() {
return Err(anyhow::anyhow!("Header values must be ASCII"));
}
if value.chars().any(|c| c.is_control() && c != '\t') {
return Err(anyhow::anyhow!(
"Header values cannot contain control characters (except tab)"
));
}
Ok(())
}
pub fn validate(&self) -> anyhow::Result<()> {
Self::validate_service_name(&self.service_name)?;
if let Some(ref endpoint) = self.exporter_endpoint {
Self::validate_url(endpoint)?;
}
if let Some(ref endpoint) = self.traces_exporter_endpoint {
Self::validate_url(endpoint)?;
}
for value in self.headers.values() {
Self::validate_header_value(value)?;
}
for value in self.traces_headers.values() {
Self::validate_header_value(value)?;
}
Ok(())
}
pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
self.service_name = name.into();
self
}
pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
self.service_version = version.into();
self
}
pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
self.log_level = level.into();
self
}
pub fn with_console_enabled(mut self, enabled: bool) -> Self {
self.enable_console = enabled;
self
}
pub fn with_traces_enabled(mut self, enabled: bool) -> Self {
self.traces_enabled = enabled;
self
}
pub fn with_exporter_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.exporter_endpoint = Some(endpoint.into());
self
}
pub fn with_traces_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.traces_exporter_endpoint = Some(endpoint.into());
self
}
pub fn with_exporter_protocol(mut self, protocol: Protocol) -> Self {
self.exporter_protocol = protocol;
self
}
pub fn with_traces_sampling(mut self, sampling: Sampling) -> Self {
self.sampling = sampling;
self
}
pub fn add_resource_attribute(mut self, key: String, value: String) -> Self {
self.resource_attributes.insert(key, value);
self
}
pub fn effective_traces_endpoint(&self) -> Option<&String> {
self.traces_exporter_endpoint
.as_ref()
.or(self.exporter_endpoint.as_ref())
}
pub fn effective_traces_protocol(&self) -> &Protocol {
self.traces_exporter_protocol
.as_ref()
.unwrap_or(&self.exporter_protocol)
}
pub fn effective_traces_headers(&self) -> HashMap<String, String> {
if self.headers.is_empty() {
return self.traces_headers.clone();
}
if self.traces_headers.is_empty() {
return self.headers.clone();
}
let mut headers = self.headers.clone();
headers.extend(self.traces_headers.clone());
headers
}
pub fn iter_effective_traces_headers(&self) -> impl Iterator<Item = (&String, &String)> {
self.headers.iter().chain(self.traces_headers.iter())
}
pub fn has_effective_traces_header(&self, key: &str) -> bool {
self.headers.contains_key(key) || self.traces_headers.contains_key(key)
}
pub fn get_effective_traces_header(&self, key: &str) -> Option<&String> {
self.traces_headers
.get(key)
.or_else(|| self.headers.get(key))
}
}
pub async fn init_otel(otlp_config: &OtlpConfig) -> Result<OtelResult> {
otlp_config
.validate()
.map_err(|e| anyhow::anyhow!("OTLP configuration validation failed: {}", e))?;
if !otlp_config.traces_enabled {
return Ok(OtelResult {
handle: TelemetryHandle::empty(),
tracer: None,
});
}
let mut resource_builder = opentelemetry_sdk::resource::Resource::builder()
.with_service_name(otlp_config.service_name.clone())
.with_attribute(opentelemetry::KeyValue::new(
"service.version",
otlp_config.service_version.clone(),
));
for (key, value) in &otlp_config.resource_attributes {
resource_builder = resource_builder
.with_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
}
let otel_resource = resource_builder.build();
let endpoint = match otlp_config.effective_traces_endpoint() {
Some(endpoint) if !endpoint.is_empty() => endpoint,
Some(_) => {
return Err(anyhow::anyhow!(
"Empty endpoint URL provided. Either provide a valid endpoint URL or disable tracing by setting traces_enabled=false"
));
}
None => {
return Ok(OtelResult {
handle: TelemetryHandle::empty(),
tracer: None,
});
}
};
let protocol = otlp_config
.traces_exporter_protocol
.clone()
.unwrap_or_else(|| otlp_config.exporter_protocol.clone());
if otlp_config.traces_exporter_protocol.is_some()
&& otlp_config.traces_exporter_protocol != Some(otlp_config.exporter_protocol.clone())
{
println!(
"Using OTLP traces protocol override: {:?}",
otlp_config.traces_exporter_protocol
);
}
if matches!(protocol, Protocol::HttpProtobuf | Protocol::HttpJson)
&& !endpoint.ends_with("/v1/traces")
{
println!(
"OTLP HTTP endpoint does not end with /v1/traces: {}",
endpoint
);
}
let exporter = match protocol {
Protocol::Grpc => SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()?,
Protocol::HttpProtobuf | Protocol::HttpJson => {
let mut exporter_builder = SpanExporter::builder().with_http().with_endpoint(endpoint);
let headers = otlp_config.effective_traces_headers();
if !headers.is_empty() {
exporter_builder = exporter_builder.with_headers(headers);
}
exporter_builder.build()?
}
};
let sampler = match otlp_config.sampling {
Sampling::AlwaysOn => Sampler::AlwaysOn,
Sampling::AlwaysOff => Sampler::AlwaysOff,
Sampling::Ratio(x) => Sampler::TraceIdRatioBased(x),
};
let tracer_provider = sdktrace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(otel_resource)
.with_sampler(sampler)
.build();
let tracer = tracer_provider.tracer(otlp_config.service_name.clone());
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
opentelemetry::global::set_text_map_propagator(propagator);
Ok(OtelResult {
handle: TelemetryHandle::new(tracer_provider),
tracer: Some(tracer),
})
}
struct HeaderInjector<'a>(&'a mut HashMap<String, String>);
impl<'a> Injector for HeaderInjector<'a> {
fn set(&mut self, key: &str, value: String) {
self.0.insert(key.to_string(), value);
}
}
pub fn inject_trace_headers() -> HashMap<String, String> {
let context = tracing::Span::current().context();
let mut headers = HashMap::new();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut HeaderInjector(&mut headers));
});
headers
}
pub fn apply_trace_headers_to_reqwest(
mut request_builder: reqwest::RequestBuilder,
headers: HashMap<String, String>,
) -> reqwest::RequestBuilder {
for (key, value) in headers {
if let (Ok(header_name), Ok(header_value)) = (
reqwest::header::HeaderName::from_bytes(key.as_bytes()),
reqwest::header::HeaderValue::from_str(&value),
) {
tracing::trace!("Adding trace header: {} = {}", key, value);
request_builder = request_builder.header(header_name, header_value);
}
}
request_builder
}
pub fn extract_and_set_trace_context_axum(headers: &axum::http::HeaderMap) {
struct AxumHeaderExtractor<'a>(&'a axum::http::HeaderMap);
impl<'a> Extractor for AxumHeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|value| value.to_str().ok())
}
fn keys(&self) -> Vec<&str> {
self.0.keys().map(|name| name.as_str()).collect()
}
}
tracing::debug!("Extracting trace context from {} headers", headers.len());
let extractor = AxumHeaderExtractor(headers);
let context =
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor));
let span = tracing::Span::current();
tracing::debug!("Setting parent context on span: {:?}", span.id());
let _ = span.set_parent(context);
}
struct EnvInjector<'a>(&'a mut HashMap<String, String>);
impl<'a> Injector for EnvInjector<'a> {
fn set(&mut self, key: &str, value: String) {
self.0.insert(key.to_string(), value);
}
}
struct EnvExtractor<'a>(&'a HashMap<String, String>);
impl<'a> Extractor for EnvExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).map(|s| s.as_str())
}
fn keys(&self) -> Vec<&str> {
self.0.keys().map(|k| k.as_str()).collect()
}
}
pub fn inject_trace_env() -> HashMap<String, String> {
let context = tracing::Span::current().context();
let mut env_vars = HashMap::new();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut EnvInjector(&mut env_vars));
});
env_vars
}
pub fn extract_and_set_trace_context_env() {
let env_vars: HashMap<String, String> = std::env::vars()
.filter(|(key, _)| {
key.starts_with("traceparent")
|| key.starts_with("tracestate")
|| key.starts_with("baggage")
|| key.starts_with("b3") })
.collect();
if env_vars.is_empty() {
tracing::debug!("No trace context found in environment variables");
return;
}
tracing::debug!(
"Extracting trace context from {} environment variables: {:?}",
env_vars.len(),
env_vars.keys().collect::<Vec<_>>()
);
let extractor = EnvExtractor(&env_vars);
let context =
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor));
let span = tracing::Span::current();
tracing::debug!("Setting parent context on span: {:?}", span.id());
let _ = span.set_parent(context);
}
pub fn extract_and_set_trace_context_from_env_map(env_vars: &HashMap<String, String>) {
let trace_vars: HashMap<String, String> = env_vars
.iter()
.filter(|(key, _)| {
key.starts_with("traceparent")
|| key.starts_with("tracestate")
|| key.starts_with("baggage")
|| key.starts_with("b3")
})
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if trace_vars.is_empty() {
tracing::debug!("No trace context found in provided environment variables");
return;
}
tracing::debug!(
"Extracting trace context from {} environment variables: {:?}",
trace_vars.len(),
trace_vars.keys().collect::<Vec<_>>()
);
let extractor = EnvExtractor(&trace_vars);
let context =
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor));
let span = tracing::Span::current();
tracing::debug!("Setting parent context on span: {:?}", span.id());
let _ = span.set_parent(context);
}
pub fn inject_trace_env_to_cmd(mut command: std::process::Command) -> std::process::Command {
let trace_env = inject_trace_env();
for (key, value) in trace_env {
command.env(key, value);
}
command
}
pub fn get_baggage(key: &str) -> Option<String> {
use opentelemetry::{baggage::BaggageExt, Context};
Context::current().baggage().get(key).map(|v| v.to_string())
}
pub fn get_all_baggage() -> HashMap<String, String> {
use opentelemetry::{baggage::BaggageExt, Context};
Context::current()
.baggage()
.into_iter()
.map(|(k, (v, _metadata))| (k.to_string(), v.to_string()))
.collect()
}
fn parse_protocol_env(
var_name: &str,
default: crate::rialo_opentelemetry::Protocol,
) -> crate::rialo_opentelemetry::Protocol {
std::env::var(var_name)
.ok()
.and_then(|v| match v.to_lowercase().as_str() {
"grpc" => Some(crate::rialo_opentelemetry::Protocol::Grpc),
"http/protobuf" => Some(crate::rialo_opentelemetry::Protocol::HttpProtobuf),
"http/json" => Some(crate::rialo_opentelemetry::Protocol::HttpJson),
_ => None,
})
.unwrap_or(default)
}
fn parse_sampler_env(
var_name: &str,
arg_var_name: &str,
default: crate::rialo_opentelemetry::Sampling,
) -> crate::rialo_opentelemetry::Sampling {
std::env::var(var_name)
.ok()
.and_then(|v| match v.to_lowercase().as_str() {
"always_on" | "parentbased_always_on" => {
Some(crate::rialo_opentelemetry::Sampling::AlwaysOn)
}
"always_off" | "parentbased_always_off" => {
Some(crate::rialo_opentelemetry::Sampling::AlwaysOff)
}
"traceidratio" | "parentbased_traceidratio" => {
let ratio = std::env::var(arg_var_name)
.ok()
.and_then(|arg| arg.parse::<f64>().ok())
.unwrap_or(0.05);
let clamped_ratio = ratio.clamp(0.0, 1.0);
Some(crate::rialo_opentelemetry::Sampling::Ratio(clamped_ratio))
}
_ => None,
})
.unwrap_or(default)
}
#[cfg(test)]
mod tests {
use std::env;
use serial_test::serial;
use super::*;
use crate::parse_env::parse_key_value_string;
#[test]
#[serial]
fn test_otlp_config_default() {
env::remove_var("OTEL_SERVICE_NAME");
env::remove_var("OTEL_SERVICE_VERSION");
env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");
env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
env::remove_var("OTEL_TRACES_ENABLED");
env::remove_var("OTEL_LOG_LEVEL");
env::remove_var("OTEL_EXPORTER_OTLP_PROTOCOL");
env::remove_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL");
env::remove_var("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL");
env::remove_var("OTEL_RESOURCE_ATTRIBUTES");
let config = OtlpConfig::default();
assert_eq!(config.service_name, "rialo");
assert_eq!(config.service_version, "unknown");
assert_eq!(config.log_level, "info");
assert!(config.traces_enabled);
assert!(config.enable_console);
assert_eq!(
config.effective_traces_endpoint(),
Some(&"http://localhost:4318".to_string())
);
assert!(config.effective_traces_headers().is_empty());
assert_eq!(config.exporter_protocol, Protocol::HttpProtobuf);
}
#[test]
#[serial]
fn test_otlp_config_from_env() {
env::set_var("OTEL_SERVICE_NAME", "test-service");
env::set_var("OTEL_SERVICE_VERSION", "1.2.3");
env::set_var(
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT",
"https://api.honeycomb.io/v1/traces",
);
env::set_var(
"OTEL_EXPORTER_OTLP_TRACES_HEADERS",
"x-honeycomb-team=your-api-key",
);
env::set_var("OTEL_TRACES_ENABLED", "true");
env::set_var("OTEL_LOG_LEVEL", "debug");
let config = OtlpConfig::default();
assert_eq!(config.service_name, "test-service");
assert_eq!(config.service_version, "1.2.3");
assert_eq!(
config.effective_traces_endpoint(),
Some(&"https://api.honeycomb.io/v1/traces".to_string())
);
assert_eq!(
config.traces_headers.get("x-honeycomb-team"),
Some(&"your-api-key".to_string())
);
assert!(config.traces_enabled);
assert_eq!(config.log_level, "debug");
env::remove_var("OTEL_SERVICE_NAME");
env::remove_var("OTEL_SERVICE_VERSION");
env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");
env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
env::remove_var("OTEL_TRACES_ENABLED");
env::remove_var("OTEL_LOG_LEVEL");
}
#[test]
fn test_otlp_config_builder() {
let config = OtlpConfig::new()
.with_service_name("custom-service")
.with_service_version("2.0.0")
.with_traces_endpoint("https://api.honeycomb.io/v1/traces")
.with_log_level("debug")
.with_console_enabled(false)
.with_traces_enabled(true);
assert_eq!(config.service_name, "custom-service");
assert_eq!(config.service_version, "2.0.0");
assert_eq!(
config.effective_traces_endpoint(),
Some(&"https://api.honeycomb.io/v1/traces".to_string())
);
assert_eq!(config.log_level, "debug");
assert!(!config.enable_console);
assert!(config.traces_enabled);
}
#[test]
fn test_parse_key_value_string() {
let headers_str =
"x-honeycomb-team=your-api-key,x-honeycomb-dataset=rialo,content-type=application/json";
let headers = parse_key_value_string(headers_str);
assert_eq!(headers.len(), 3);
assert_eq!(
headers.get("x-honeycomb-team"),
Some(&"your-api-key".to_string())
);
assert_eq!(
headers.get("x-honeycomb-dataset"),
Some(&"rialo".to_string())
);
assert_eq!(
headers.get("content-type"),
Some(&"application/json".to_string())
);
}
#[test]
fn test_parse_key_value_string_semicolon() {
let headers_str = "x-honeycomb-team=your-api-key;x-honeycomb-dataset=rialo";
let headers = parse_key_value_string(headers_str);
assert_eq!(headers.len(), 2);
assert_eq!(
headers.get("x-honeycomb-team"),
Some(&"your-api-key".to_string())
);
assert_eq!(
headers.get("x-honeycomb-dataset"),
Some(&"rialo".to_string())
);
}
#[test]
fn test_parse_key_value_string_empty() {
let headers = parse_key_value_string("");
assert_eq!(headers.len(), 0);
}
#[test]
#[serial]
fn test_effective_endpoints() {
env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");
let mut config = OtlpConfig::default();
assert_eq!(
config.effective_traces_endpoint(),
Some(&"http://localhost:4318".to_string())
);
config.exporter_endpoint = Some("https://general.endpoint".to_string());
assert_eq!(
config.effective_traces_endpoint(),
Some(&"https://general.endpoint".to_string())
);
config.traces_exporter_endpoint = Some("https://traces.endpoint".to_string());
assert_eq!(
config.effective_traces_endpoint(),
Some(&"https://traces.endpoint".to_string())
);
config.exporter_endpoint = None;
assert_eq!(
config.effective_traces_endpoint(),
Some(&"https://traces.endpoint".to_string())
);
config.traces_exporter_endpoint = None;
assert!(config.effective_traces_endpoint().is_none());
}
#[test]
#[serial]
fn test_effective_headers() {
env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
let mut config = OtlpConfig::default();
config
.headers
.insert("general".to_string(), "value1".to_string());
config
.headers
.insert("shared".to_string(), "general_value".to_string());
config
.traces_headers
.insert("traces".to_string(), "value2".to_string());
config
.traces_headers
.insert("shared".to_string(), "traces_value".to_string());
let effective = config.effective_traces_headers();
assert_eq!(effective.len(), 3);
assert_eq!(effective.get("general"), Some(&"value1".to_string()));
assert_eq!(effective.get("traces"), Some(&"value2".to_string()));
assert_eq!(effective.get("shared"), Some(&"traces_value".to_string())); }
#[tokio::test]
#[serial]
async fn test_inject_trace_headers() {
use tracing::Instrument;
let otlp_config = OtlpConfig::new()
.with_service_name("test-service")
.with_exporter_endpoint("".to_string()) .with_traces_enabled(true);
let _otel_result = init_otel(&otlp_config).await;
let span = tracing::info_span!("test_span", test_field = "test_value");
let headers = async move {
inject_trace_headers()
}
.instrument(span)
.await;
println!("Injected headers: {:?}", headers);
if !headers.is_empty() {
let has_trace_headers = headers.contains_key("traceparent")
|| headers.contains_key("tracestate")
|| headers.keys().any(|k| k.starts_with("x-trace"));
if !has_trace_headers {
println!("Warning: Headers present but no recognized trace headers found");
}
}
}
#[test]
fn test_inject_trace_headers_no_span() {
let headers = inject_trace_headers();
println!("Headers outside span: {:?}", headers);
assert!(headers.is_empty() || !headers.is_empty());
}
#[tokio::test]
#[serial]
async fn test_inject_trace_env() {
use tracing::Instrument;
let otlp_config = OtlpConfig::new()
.with_service_name("test-service")
.with_exporter_endpoint("".to_string()) .with_traces_enabled(true);
let _otel_result = init_otel(&otlp_config).await;
let span = tracing::info_span!("test_env_span", test_field = "test_value");
let env_vars = async move {
inject_trace_env()
}
.instrument(span)
.await;
println!("Injected environment variables: {:?}", env_vars);
if !env_vars.is_empty() {
let has_trace_env =
env_vars.contains_key("traceparent") || env_vars.contains_key("tracestate");
if !has_trace_env {
println!(
"Warning: Environment variables present but no recognized trace context found"
);
}
if let Some(traceparent) = env_vars.get("traceparent") {
let parts: Vec<&str> = traceparent.split('-').collect();
assert_eq!(
parts.len(),
4,
"traceparent should have 4 parts separated by hyphens"
);
assert_eq!(parts[0], "00", "traceparent should start with version '00'");
}
}
}
#[test]
fn test_inject_trace_env_no_span() {
let env_vars = inject_trace_env();
println!("Environment variables outside span: {:?}", env_vars);
assert!(env_vars.is_empty() || !env_vars.is_empty());
}
#[test]
fn test_extract_and_set_trace_context_env_no_vars() {
extract_and_set_trace_context_env();
}
#[test]
fn test_extract_and_set_trace_context_from_env_map() {
let empty_env = HashMap::new();
extract_and_set_trace_context_from_env_map(&empty_env);
let mut mock_env = HashMap::new();
mock_env.insert(
"traceparent".to_string(),
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
);
mock_env.insert(
"tracestate".to_string(),
"rojo=00f067aa0ba902b7".to_string(),
);
mock_env.insert(
"irrelevant_var".to_string(),
"should_be_ignored".to_string(),
);
extract_and_set_trace_context_from_env_map(&mock_env);
}
#[test]
fn test_env_context_round_trip() {
use tracing::Instrument;
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let otlp_config = OtlpConfig::new()
.with_service_name("test-parent")
.with_exporter_endpoint("".to_string())
.with_traces_enabled(true);
let _otel_result = init_otel(&otlp_config).await;
let parent_span = tracing::info_span!("parent_process", process_id = "parent");
let injected_env = async move { inject_trace_env() }
.instrument(parent_span)
.await;
println!("Parent injected env vars: {:?}", injected_env);
let child_span = tracing::info_span!("child_process", process_id = "child");
let _child_guard = child_span.enter();
extract_and_set_trace_context_from_env_map(&injected_env);
});
}
#[test]
fn test_inject_trace_env_to_cmd() {
use std::process::Command;
let cmd = inject_trace_env_to_cmd(Command::new("echo"));
let program = cmd.get_program();
assert_eq!(program, "echo");
}
#[tokio::test]
#[serial]
async fn test_inject_trace_env_to_cmd_with_span() {
use std::process::Command;
use tracing::Instrument;
let otlp_config = OtlpConfig::new()
.with_service_name("test-cmd")
.with_exporter_endpoint("".to_string())
.with_traces_enabled(true);
let _otel_result = init_otel(&otlp_config).await;
let span = tracing::info_span!("cmd_test");
let cmd = async move { inject_trace_env_to_cmd(Command::new("test-command")) }
.instrument(span)
.await;
assert_eq!(cmd.get_program(), "test-command");
}
#[test]
fn test_baggage_read_operations() {
let value = get_baggage("non_existent_key");
assert_eq!(value, None);
let all_baggage = get_all_baggage();
assert!(all_baggage.is_empty());
}
#[tokio::test]
#[serial]
async fn test_baggage_operations() {
let otlp_config = OtlpConfig::new()
.with_service_name("test-baggage")
.with_exporter_endpoint("".to_string())
.with_traces_enabled(true);
let otel_result = init_otel(&otlp_config).await;
println!(
"OpenTelemetry initialization result: {:?}",
otel_result.is_ok()
);
if let Ok(result) = &otel_result {
println!("Tracer available: {}", result.tracer.is_some());
}
use tracing::Instrument;
let test_span = tracing::info_span!("baggage_test");
async {
use opentelemetry::{
baggage::{Baggage, BaggageExt, BaggageMetadata},
Context, Key, Value,
};
let mut baggage = Baggage::new();
let _ = baggage.insert_with_metadata(
Key::new("test_key".to_string()),
Value::from("test_value".to_string()),
BaggageMetadata::default(),
);
let context = Context::current().with_baggage(baggage);
let _guard = context.attach();
let current_context = Context::current();
let baggage = current_context.baggage();
println!("Direct context baggage: {:?}", baggage.get("test_key"));
{
let mut new_baggage = Baggage::new();
let _ = new_baggage.insert_with_metadata(
Key::new("user_id".to_string()),
Value::from("12345".to_string()),
BaggageMetadata::default(),
);
let _ = new_baggage.insert_with_metadata(
Key::new("session_id".to_string()),
Value::from("session-abc".to_string()),
BaggageMetadata::default(),
);
let new_context = Context::current().with_baggage(new_baggage);
let _inner_guard = new_context.attach();
let user_id = get_baggage("user_id");
let session_id = get_baggage("session_id");
println!(
"After setting baggage in context, get_baggage('user_id') = {:?}",
user_id
);
println!(
"After setting baggage in context, get_baggage('session_id') = {:?}",
session_id
);
assert_eq!(user_id, Some("12345".to_string()));
assert_eq!(session_id, Some("session-abc".to_string()));
assert_eq!(get_baggage("non_existent"), None);
let all_baggage = get_all_baggage();
assert!(all_baggage.contains_key("user_id"));
assert!(all_baggage.contains_key("session_id"));
assert_eq!(all_baggage.get("user_id"), Some(&"12345".to_string()));
assert_eq!(
all_baggage.get("session_id"),
Some(&"session-abc".to_string())
);
{
let mut new_baggage = Baggage::new();
let _ = new_baggage.insert_with_metadata(
Key::new("user_id".to_string()),
Value::from("12345".to_string()),
BaggageMetadata::default(),
);
let new_context = Context::current().with_baggage(new_baggage);
let _inner_guard = new_context.attach();
assert_eq!(get_baggage("session_id"), None);
assert_eq!(get_baggage("user_id"), Some("12345".to_string()));
}
{
let empty_baggage = Baggage::new();
let new_context = Context::current().with_baggage(empty_baggage);
let _inner_guard = new_context.attach();
assert_eq!(get_baggage("user_id"), None);
assert!(get_all_baggage().is_empty());
}
}
}
.instrument(test_span)
.await;
}
#[test]
fn test_follows_from_during_close_no_panic() {
use opentelemetry::trace::TracerProvider as _;
use tracing::Subscriber;
use tracing_subscriber::prelude::*;
#[derive(Clone)]
struct WaitOnCloseLayer;
impl<S: Subscriber> tracing_subscriber::Layer<S> for WaitOnCloseLayer {
fn on_close(&self, _id: tracing::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
std::thread::sleep(std::time::Duration::from_millis(20));
}
}
#[derive(Clone, Default, Debug)]
struct NoopExporter;
impl opentelemetry_sdk::trace::SpanExporter for NoopExporter {
async fn export(
&self,
_batch: Vec<opentelemetry_sdk::trace::SpanData>,
) -> opentelemetry_sdk::error::OTelSdkResult {
Ok(())
}
}
let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_simple_exporter(NoopExporter)
.build();
let tracer = provider.tracer("regression-test");
let subscriber = std::sync::Arc::new(
tracing_subscriber::registry()
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.with(WaitOnCloseLayer),
);
tracing::subscriber::with_default(subscriber, || {
let cause_span = tracing::debug_span!("cause");
let cause_id = cause_span.id().unwrap();
std::thread::scope(|scope| {
scope.spawn(|| {
drop(cause_span);
});
std::thread::sleep(std::time::Duration::from_millis(5));
let effect_span = tracing::debug_span!("effect");
effect_span.follows_from(cause_id);
});
});
let _ = provider.shutdown();
}
}