use super::IdGenerator;
use crate::error::{OTelSdkError, OTelSdkResult};
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SdkTracer, SimpleSpanProcessor,
SpanLimits,
};
use crate::Resource;
use crate::{trace::SpanExporter, trace::SpanProcessor};
use opentelemetry::otel_debug;
use opentelemetry::{otel_info, InstrumentationScope};
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
static PROVIDER_RESOURCE: OnceLock<Resource> = OnceLock::new();
static NOOP_TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
#[inline]
fn noop_tracer_provider() -> &'static SdkTracerProvider {
NOOP_TRACER_PROVIDER.get_or_init(|| {
SdkTracerProvider {
inner: Arc::new(TracerProviderInner {
processors: Vec::new(),
config: Config {
sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
id_generator: Box::<RandomIdGenerator>::default(),
span_limits: SpanLimits::default(),
resource: Cow::Owned(Resource::empty()),
},
is_shutdown: AtomicBool::new(true),
}),
}
})
}
#[derive(Debug)]
pub(crate) struct TracerProviderInner {
processors: Vec<Box<dyn SpanProcessor>>,
config: crate::trace::Config,
is_shutdown: AtomicBool,
}
impl TracerProviderInner {
pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> Vec<OTelSdkResult> {
let mut results = vec![];
for processor in &self.processors {
let result = processor.shutdown_with_timeout(timeout);
if let Err(err) = &result {
otel_debug!(name: "TracerProvider.Drop.ShutdownError",
error = format!("{err}"));
}
results.push(result);
}
results
}
pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
self.shutdown_with_timeout(Duration::from_secs(5))
}
}
impl Drop for TracerProviderInner {
fn drop(&mut self) {
if !self.is_shutdown.load(Ordering::Relaxed) {
let _ = self.shutdown(); } else {
otel_debug!(
name: "TracerProvider.Drop.AlreadyShutdown",
message = "TracerProvider was already shut down; drop will not attempt shutdown again."
);
}
}
}
#[derive(Clone, Debug)]
pub struct SdkTracerProvider {
inner: Arc<TracerProviderInner>,
}
impl Default for SdkTracerProvider {
fn default() -> Self {
SdkTracerProvider::builder().build()
}
}
impl SdkTracerProvider {
pub(crate) fn new(inner: TracerProviderInner) -> Self {
SdkTracerProvider {
inner: Arc::new(inner),
}
}
pub fn builder() -> TracerProviderBuilder {
TracerProviderBuilder::default()
}
pub(crate) fn span_processors(&self) -> &[Box<dyn SpanProcessor>] {
&self.inner.processors
}
pub(crate) fn config(&self) -> &crate::trace::Config {
&self.inner.config
}
pub(crate) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown.load(Ordering::Relaxed)
}
pub fn force_flush(&self) -> OTelSdkResult {
let result: Vec<_> = self
.span_processors()
.iter()
.map(|processor| processor.force_flush())
.collect();
if result.iter().all(|r| r.is_ok()) {
Ok(())
} else {
Err(OTelSdkError::InternalFailure(format!("errs: {result:?}")))
}
}
pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
if self
.inner
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let results = self.inner.shutdown_with_timeout(timeout);
if results.iter().all(|res| res.is_ok()) {
Ok(())
} else {
Err(OTelSdkError::InternalFailure(format!(
"Shutdown errors: {:?}",
results
.into_iter()
.filter_map(Result::err)
.collect::<Vec<_>>() )))
}
} else {
Err(OTelSdkError::AlreadyShutdown)
}
}
pub fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
}
impl opentelemetry::trace::TracerProvider for SdkTracerProvider {
type Tracer = SdkTracer;
fn tracer(&self, name: impl Into<Cow<'static, str>>) -> Self::Tracer {
let scope = InstrumentationScope::builder(name).build();
self.tracer_with_scope(scope)
}
fn tracer_with_scope(&self, scope: InstrumentationScope) -> Self::Tracer {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
return SdkTracer::new(scope, noop_tracer_provider().clone());
}
if scope.name().is_empty() {
otel_info!(name: "TracerNameEmpty", message = "Tracer name is empty; consider providing a meaningful name. Tracer will function normally and the provided name will be used as-is.");
};
SdkTracer::new(scope, self.clone())
}
}
#[derive(Debug, Default)]
pub struct TracerProviderBuilder {
processors: Vec<Box<dyn SpanProcessor>>,
config: crate::trace::Config,
resource: Option<Resource>,
}
impl TracerProviderBuilder {
pub fn with_simple_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
let simple = SimpleSpanProcessor::new(exporter);
self.with_span_processor(simple)
}
pub fn with_batch_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
let batch = BatchSpanProcessor::builder(exporter).build();
self.with_span_processor(batch)
}
pub fn with_span_processor<T: SpanProcessor + 'static>(self, processor: T) -> Self {
let mut processors = self.processors;
processors.push(Box::new(processor));
TracerProviderBuilder { processors, ..self }
}
pub fn with_sampler<T: crate::trace::ShouldSample + 'static>(mut self, sampler: T) -> Self {
self.config.sampler = Box::new(sampler);
self
}
pub fn with_id_generator<T: IdGenerator + 'static>(mut self, id_generator: T) -> Self {
self.config.id_generator = Box::new(id_generator);
self
}
pub fn with_max_events_per_span(mut self, max_events: u32) -> Self {
self.config.span_limits.max_events_per_span = max_events;
self
}
pub fn with_max_attributes_per_span(mut self, max_attributes: u32) -> Self {
self.config.span_limits.max_attributes_per_span = max_attributes;
self
}
pub fn with_max_links_per_span(mut self, max_links: u32) -> Self {
self.config.span_limits.max_links_per_span = max_links;
self
}
pub fn with_max_attributes_per_event(mut self, max_attributes: u32) -> Self {
self.config.span_limits.max_attributes_per_event = max_attributes;
self
}
pub fn with_max_attributes_per_link(mut self, max_attributes: u32) -> Self {
self.config.span_limits.max_attributes_per_link = max_attributes;
self
}
pub fn with_span_limits(mut self, span_limits: SpanLimits) -> Self {
self.config.span_limits = span_limits;
self
}
pub fn with_resource(self, resource: Resource) -> Self {
let resource = match self.resource {
Some(existing) => Some(existing.merge(&resource)),
None => Some(resource),
};
TracerProviderBuilder { resource, ..self }
}
pub fn build(self) -> SdkTracerProvider {
let mut config = self.config;
if let Some(resource) = self.resource {
config.resource = Cow::Owned(resource);
};
if matches!(config.resource, Cow::Owned(_)) {
config.resource =
match PROVIDER_RESOURCE.get_or_init(|| config.resource.clone().into_owned()) {
static_resource if *static_resource == *config.resource.as_ref() => {
Cow::Borrowed(static_resource)
}
_ => config.resource, };
}
let mut processors = self.processors;
for p in &mut processors {
p.set_resource(config.resource.as_ref());
}
let is_shutdown = AtomicBool::new(false);
SdkTracerProvider::new(TracerProviderInner {
processors,
config,
is_shutdown,
})
}
}
#[cfg(test)]
mod tests {
use crate::error::{OTelSdkError, OTelSdkResult};
use crate::resource::{
SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
};
use crate::trace::provider::TracerProviderInner;
use crate::trace::{Config, Span, SpanProcessor};
use crate::trace::{SdkTracerProvider, SpanData};
use crate::Resource;
use opentelemetry::trace::{Tracer, TracerProvider};
use opentelemetry::{Context, Key, KeyValue, Value};
use std::env;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Default, Debug)]
struct AssertInfo {
started_span: AtomicU32,
is_shutdown: AtomicBool,
}
#[derive(Default, Debug, Clone)]
struct SharedAssertInfo(Arc<AssertInfo>);
impl SharedAssertInfo {
fn started_span_count(&self, count: u32) -> bool {
self.0.started_span.load(Ordering::SeqCst) == count
}
}
#[derive(Debug)]
struct TestSpanProcessor {
success: bool,
assert_info: SharedAssertInfo,
}
impl TestSpanProcessor {
fn new(success: bool) -> TestSpanProcessor {
TestSpanProcessor {
success,
assert_info: SharedAssertInfo::default(),
}
}
fn assert_info(&self) -> SharedAssertInfo {
self.assert_info.clone()
}
}
impl SpanProcessor for TestSpanProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
self.assert_info
.0
.started_span
.fetch_add(1, Ordering::SeqCst);
}
fn on_end(&self, _span: SpanData) {
}
fn force_flush(&self) -> OTelSdkResult {
if self.success {
Ok(())
} else {
Err(OTelSdkError::InternalFailure("cannot export".into()))
}
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
Ok(())
} else {
let _ = self.assert_info.0.is_shutdown.compare_exchange(
false,
true,
Ordering::SeqCst,
Ordering::SeqCst,
);
self.force_flush()
}
}
}
#[test]
fn test_force_flush() {
let tracer_provider = super::SdkTracerProvider::new(TracerProviderInner {
processors: vec![
Box::from(TestSpanProcessor::new(true)),
Box::from(TestSpanProcessor::new(false)),
],
config: Default::default(),
is_shutdown: AtomicBool::new(false),
});
let results = tracer_provider.force_flush();
assert!(results.is_err());
}
#[test]
fn test_tracer_provider_default_resource() {
let assert_resource = |provider: &super::SdkTracerProvider,
resource_key: &'static str,
expect: Option<&'static str>| {
assert_eq!(
provider
.config()
.resource
.get(&Key::from_static_str(resource_key))
.map(|v| v.to_string()),
expect.map(|s| s.to_string())
);
};
let assert_telemetry_resource = |provider: &super::SdkTracerProvider| {
assert_eq!(
provider
.config()
.resource
.get(&TELEMETRY_SDK_LANGUAGE.into()),
Some(Value::from("rust"))
);
assert_eq!(
provider.config().resource.get(&TELEMETRY_SDK_NAME.into()),
Some(Value::from("opentelemetry"))
);
assert_eq!(
provider
.config()
.resource
.get(&TELEMETRY_SDK_VERSION.into()),
Some(Value::from(env!("CARGO_PKG_VERSION")))
);
};
temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
let default_config_provider = super::SdkTracerProvider::builder().build();
assert_resource(
&default_config_provider,
SERVICE_NAME,
Some("unknown_service"),
);
assert_telemetry_resource(&default_config_provider);
});
let custom_config_provider = super::SdkTracerProvider::builder()
.with_resource(
Resource::builder_empty()
.with_service_name("test_service")
.build(),
)
.build();
assert_resource(&custom_config_provider, SERVICE_NAME, Some("test_service"));
assert_eq!(custom_config_provider.config().resource.len(), 1);
temp_env::with_var(
"OTEL_RESOURCE_ATTRIBUTES",
Some("key1=value1, k2, k3=value2"),
|| {
let env_resource_provider = super::SdkTracerProvider::builder().build();
assert_resource(
&env_resource_provider,
SERVICE_NAME,
Some("unknown_service"),
);
assert_resource(&env_resource_provider, "key1", Some("value1"));
assert_resource(&env_resource_provider, "k3", Some("value2"));
assert_telemetry_resource(&env_resource_provider);
assert_eq!(env_resource_provider.config().resource.len(), 6);
},
);
temp_env::with_var(
"OTEL_RESOURCE_ATTRIBUTES",
Some("my-custom-key=env-val,k2=value2"),
|| {
let user_provided_resource_config_provider = super::SdkTracerProvider::builder()
.with_resource(
Resource::builder()
.with_attributes([
KeyValue::new("my-custom-key", "my-custom-value"),
KeyValue::new("my-custom-key2", "my-custom-value2"),
])
.build(),
)
.build();
assert_resource(
&user_provided_resource_config_provider,
SERVICE_NAME,
Some("unknown_service"),
);
assert_resource(
&user_provided_resource_config_provider,
"my-custom-key",
Some("my-custom-value"),
);
assert_resource(
&user_provided_resource_config_provider,
"my-custom-key2",
Some("my-custom-value2"),
);
assert_resource(
&user_provided_resource_config_provider,
"k2",
Some("value2"),
);
assert_telemetry_resource(&user_provided_resource_config_provider);
assert_eq!(
user_provided_resource_config_provider
.config()
.resource
.len(),
7
);
},
);
let no_service_name = super::SdkTracerProvider::builder()
.with_resource(Resource::empty())
.build();
assert_eq!(no_service_name.config().resource.len(), 0)
}
#[test]
fn test_shutdown_noops() {
let processor = TestSpanProcessor::new(false);
let assert_handle = processor.assert_info();
let tracer_provider = super::SdkTracerProvider::new(TracerProviderInner {
processors: vec![Box::from(processor)],
config: Default::default(),
is_shutdown: AtomicBool::new(false),
});
let test_tracer_1 = tracer_provider.tracer("test1");
let _ = test_tracer_1.start("test");
assert!(assert_handle.started_span_count(1));
let _ = test_tracer_1.start("test");
assert!(assert_handle.started_span_count(2));
let shutdown = |tracer_provider: super::SdkTracerProvider| {
let _ = tracer_provider.shutdown(); };
shutdown(tracer_provider.clone());
let noop_tracer = tracer_provider.tracer("noop");
let _ = noop_tracer.start("test");
assert!(assert_handle.started_span_count(2));
assert!(noop_tracer.provider().is_shutdown());
let _ = test_tracer_1.start("test");
assert!(assert_handle.started_span_count(2));
assert!(test_tracer_1.provider().is_shutdown());
}
#[test]
fn with_resource_multiple_calls_ensure_additive() {
let resource = SdkTracerProvider::builder()
.with_resource(Resource::new(vec![KeyValue::new("key1", "value1")]))
.with_resource(Resource::new(vec![KeyValue::new("key2", "value2")]))
.with_resource(
Resource::builder_empty()
.with_schema_url(vec![], "http://example.com")
.build(),
)
.with_resource(Resource::new(vec![KeyValue::new("key3", "value3")]))
.build()
.inner
.config
.resource
.clone()
.into_owned();
assert_eq!(
resource.get(&Key::from_static_str("key1")),
Some(Value::from("value1"))
);
assert_eq!(
resource.get(&Key::from_static_str("key2")),
Some(Value::from("value2"))
);
assert_eq!(
resource.get(&Key::from_static_str("key3")),
Some(Value::from("value3"))
);
assert_eq!(resource.schema_url(), Some("http://example.com"));
}
#[derive(Debug)]
struct CountingShutdownProcessor {
shutdown_count: Arc<AtomicU32>,
}
impl CountingShutdownProcessor {
fn new(shutdown_count: Arc<AtomicU32>) -> Self {
CountingShutdownProcessor { shutdown_count }
}
}
impl SpanProcessor for CountingShutdownProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
}
fn on_end(&self, _span: SpanData) {
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.shutdown_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[test]
fn drop_test_with_multiple_providers() {
let shutdown_count = Arc::new(AtomicU32::new(0));
{
let shared_inner = Arc::new(TracerProviderInner {
processors: vec![Box::new(CountingShutdownProcessor::new(
shutdown_count.clone(),
))],
config: Config::default(),
is_shutdown: AtomicBool::new(false),
});
{
let tracer_provider1 = super::SdkTracerProvider {
inner: shared_inner.clone(),
};
let tracer_provider2 = super::SdkTracerProvider {
inner: shared_inner.clone(),
};
let tracer1 = tracer_provider1.tracer("test-tracer1");
let tracer2 = tracer_provider2.tracer("test-tracer2");
let _span1 = tracer1.start("span1");
let _span2 = tracer2.start("span2");
}
assert_eq!(shutdown_count.load(Ordering::SeqCst), 0);
}
assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
}
#[test]
fn drop_after_shutdown_test_with_multiple_providers() {
let shutdown_count = Arc::new(AtomicU32::new(0));
let shared_inner = Arc::new(TracerProviderInner {
processors: vec![Box::new(CountingShutdownProcessor::new(
shutdown_count.clone(),
))],
config: Config::default(),
is_shutdown: AtomicBool::new(false),
});
{
let tracer_provider1 = super::SdkTracerProvider {
inner: shared_inner.clone(),
};
let tracer_provider2 = super::SdkTracerProvider {
inner: shared_inner.clone(),
};
let shutdown_result = tracer_provider1.shutdown();
assert!(shutdown_result.is_ok());
assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
let shutdown_result2 = tracer_provider2.shutdown();
assert!(shutdown_result2.is_err());
assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
}
assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
}
}