1use super::IdGenerator;
2use crate::error::{OTelSdkError, OTelSdkResult};
3use crate::trace::{
68 BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SdkTracer, SimpleSpanProcessor,
69 SpanLimits,
70};
71use crate::Resource;
72use crate::{trace::SpanExporter, trace::SpanProcessor};
73use opentelemetry::otel_debug;
74use opentelemetry::{otel_info, InstrumentationScope};
75use std::borrow::Cow;
76use std::sync::atomic::{AtomicBool, Ordering};
77use std::sync::{Arc, OnceLock};
78use std::time::Duration;
79
80static PROVIDER_RESOURCE: OnceLock<Resource> = OnceLock::new();
81
82static NOOP_TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
85#[inline]
86fn noop_tracer_provider() -> &'static SdkTracerProvider {
87 NOOP_TRACER_PROVIDER.get_or_init(|| {
88 SdkTracerProvider {
89 inner: Arc::new(TracerProviderInner {
90 processors: Vec::new(),
91 config: Config {
92 sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
94 id_generator: Box::<RandomIdGenerator>::default(),
95 span_limits: SpanLimits::default(),
96 resource: Cow::Owned(Resource::empty()),
97 },
98 is_shutdown: AtomicBool::new(true),
99 }),
100 }
101 })
102}
103
104#[derive(Debug)]
106pub(crate) struct TracerProviderInner {
107 processors: Vec<Box<dyn SpanProcessor>>,
108 config: crate::trace::Config,
109 is_shutdown: AtomicBool,
110}
111
112impl TracerProviderInner {
113 pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> Vec<OTelSdkResult> {
116 let mut results = vec![];
117 for processor in &self.processors {
118 let result = processor.shutdown_with_timeout(timeout);
119 if let Err(err) = &result {
120 otel_debug!(name: "TracerProvider.Drop.ShutdownError",
125 error = format!("{err}"));
126 }
127 results.push(result);
128 }
129 results
130 }
131 pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
133 self.shutdown_with_timeout(Duration::from_secs(5))
134 }
135}
136
137impl Drop for TracerProviderInner {
138 fn drop(&mut self) {
139 if !self.is_shutdown.load(Ordering::Relaxed) {
140 let _ = self.shutdown(); } else {
142 otel_debug!(
143 name: "TracerProvider.Drop.AlreadyShutdown",
144 message = "TracerProvider was already shut down; drop will not attempt shutdown again."
145 );
146 }
147 }
148}
149
150#[derive(Clone, Debug)]
158pub struct SdkTracerProvider {
159 inner: Arc<TracerProviderInner>,
160}
161
162impl Default for SdkTracerProvider {
163 fn default() -> Self {
164 SdkTracerProvider::builder().build()
165 }
166}
167
168impl SdkTracerProvider {
169 pub(crate) fn new(inner: TracerProviderInner) -> Self {
171 SdkTracerProvider {
172 inner: Arc::new(inner),
173 }
174 }
175
176 pub fn builder() -> TracerProviderBuilder {
178 TracerProviderBuilder::default()
179 }
180
181 pub(crate) fn span_processors(&self) -> &[Box<dyn SpanProcessor>] {
183 &self.inner.processors
184 }
185
186 pub(crate) fn config(&self) -> &crate::trace::Config {
188 &self.inner.config
189 }
190
191 pub(crate) fn is_shutdown(&self) -> bool {
194 self.inner.is_shutdown.load(Ordering::Relaxed)
195 }
196
197 pub fn force_flush(&self) -> OTelSdkResult {
231 let result: Vec<_> = self
232 .span_processors()
233 .iter()
234 .map(|processor| processor.force_flush())
235 .collect();
236 if result.iter().all(|r| r.is_ok()) {
237 Ok(())
238 } else {
239 Err(OTelSdkError::InternalFailure(format!("errs: {result:?}")))
240 }
241 }
242
243 pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
247 if self
248 .inner
249 .is_shutdown
250 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
251 .is_ok()
252 {
253 let results = self.inner.shutdown_with_timeout(timeout);
255
256 if results.iter().all(|res| res.is_ok()) {
257 Ok(())
258 } else {
259 Err(OTelSdkError::InternalFailure(format!(
260 "Shutdown errors: {:?}",
261 results
262 .into_iter()
263 .filter_map(Result::err)
264 .collect::<Vec<_>>() )))
266 }
267 } else {
268 Err(OTelSdkError::AlreadyShutdown)
269 }
270 }
271
272 pub fn shutdown(&self) -> OTelSdkResult {
274 self.shutdown_with_timeout(Duration::from_secs(5))
275 }
276}
277
278impl opentelemetry::trace::TracerProvider for SdkTracerProvider {
279 type Tracer = SdkTracer;
281
282 fn tracer(&self, name: impl Into<Cow<'static, str>>) -> Self::Tracer {
283 let scope = InstrumentationScope::builder(name).build();
284 self.tracer_with_scope(scope)
285 }
286
287 fn tracer_with_scope(&self, scope: InstrumentationScope) -> Self::Tracer {
288 if self.inner.is_shutdown.load(Ordering::Relaxed) {
289 return SdkTracer::new(scope, noop_tracer_provider().clone());
290 }
291 if scope.name().is_empty() {
292 otel_info!(name: "TracerNameEmpty", message = "Tracer name is empty; consider providing a meaningful name. Tracer will function normally and the provided name will be used as-is.");
293 };
294 SdkTracer::new(scope, self.clone())
295 }
296}
297
298#[derive(Debug, Default)]
300pub struct TracerProviderBuilder {
301 processors: Vec<Box<dyn SpanProcessor>>,
302 config: crate::trace::Config,
303 resource: Option<Resource>,
304}
305
306impl TracerProviderBuilder {
307 pub fn with_simple_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
319 let simple = SimpleSpanProcessor::new(exporter);
320 self.with_span_processor(simple)
321 }
322
323 pub fn with_batch_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
335 let batch = BatchSpanProcessor::builder(exporter).build();
336 self.with_span_processor(batch)
337 }
338
339 pub fn with_span_processor<T: SpanProcessor + 'static>(self, processor: T) -> Self {
351 let mut processors = self.processors;
352 processors.push(Box::new(processor));
353
354 TracerProviderBuilder { processors, ..self }
355 }
356
357 pub fn with_sampler<T: crate::trace::ShouldSample + 'static>(mut self, sampler: T) -> Self {
359 self.config.sampler = Box::new(sampler);
360 self
361 }
362
363 pub fn with_id_generator<T: IdGenerator + 'static>(mut self, id_generator: T) -> Self {
365 self.config.id_generator = Box::new(id_generator);
366 self
367 }
368
369 pub fn with_max_events_per_span(mut self, max_events: u32) -> Self {
371 self.config.span_limits.max_events_per_span = max_events;
372 self
373 }
374
375 pub fn with_max_attributes_per_span(mut self, max_attributes: u32) -> Self {
377 self.config.span_limits.max_attributes_per_span = max_attributes;
378 self
379 }
380
381 pub fn with_max_links_per_span(mut self, max_links: u32) -> Self {
383 self.config.span_limits.max_links_per_span = max_links;
384 self
385 }
386
387 pub fn with_max_attributes_per_event(mut self, max_attributes: u32) -> Self {
389 self.config.span_limits.max_attributes_per_event = max_attributes;
390 self
391 }
392
393 pub fn with_max_attributes_per_link(mut self, max_attributes: u32) -> Self {
395 self.config.span_limits.max_attributes_per_link = max_attributes;
396 self
397 }
398
399 pub fn with_span_limits(mut self, span_limits: SpanLimits) -> Self {
401 self.config.span_limits = span_limits;
402 self
403 }
404
405 pub fn with_resource(self, resource: Resource) -> Self {
417 let resource = match self.resource {
418 Some(existing) => Some(existing.merge(&resource)),
419 None => Some(resource),
420 };
421
422 TracerProviderBuilder { resource, ..self }
423 }
424
425 pub fn build(self) -> SdkTracerProvider {
427 let mut config = self.config;
428
429 if let Some(resource) = self.resource {
431 config.resource = Cow::Owned(resource);
432 };
433
434 if matches!(config.resource, Cow::Owned(_)) {
441 config.resource =
442 match PROVIDER_RESOURCE.get_or_init(|| config.resource.clone().into_owned()) {
443 static_resource if *static_resource == *config.resource.as_ref() => {
444 Cow::Borrowed(static_resource)
445 }
446 _ => config.resource, };
448 }
449
450 let mut processors = self.processors;
452
453 for p in &mut processors {
455 p.set_resource(config.resource.as_ref());
456 }
457
458 let is_shutdown = AtomicBool::new(false);
459 SdkTracerProvider::new(TracerProviderInner {
460 processors,
461 config,
462 is_shutdown,
463 })
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use crate::error::{OTelSdkError, OTelSdkResult};
470 use crate::resource::{
471 SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
472 };
473 use crate::trace::provider::TracerProviderInner;
474 use crate::trace::{Config, Span, SpanProcessor};
475 use crate::trace::{SdkTracerProvider, SpanData};
476 use crate::Resource;
477 use opentelemetry::trace::{Tracer, TracerProvider};
478 use opentelemetry::{Context, Key, KeyValue, Value};
479
480 use std::env;
481 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
482 use std::sync::Arc;
483 use std::time::Duration;
484
485 #[derive(Default, Debug)]
487 struct AssertInfo {
488 started_span: AtomicU32,
489 is_shutdown: AtomicBool,
490 }
491
492 #[derive(Default, Debug, Clone)]
493 struct SharedAssertInfo(Arc<AssertInfo>);
494
495 impl SharedAssertInfo {
496 fn started_span_count(&self, count: u32) -> bool {
497 self.0.started_span.load(Ordering::SeqCst) == count
498 }
499 }
500
501 #[derive(Debug)]
502 struct TestSpanProcessor {
503 success: bool,
504 assert_info: SharedAssertInfo,
505 }
506
507 impl TestSpanProcessor {
508 fn new(success: bool) -> TestSpanProcessor {
509 TestSpanProcessor {
510 success,
511 assert_info: SharedAssertInfo::default(),
512 }
513 }
514
515 fn assert_info(&self) -> SharedAssertInfo {
517 self.assert_info.clone()
518 }
519 }
520
521 impl SpanProcessor for TestSpanProcessor {
522 fn on_start(&self, _span: &mut Span, _cx: &Context) {
523 self.assert_info
524 .0
525 .started_span
526 .fetch_add(1, Ordering::SeqCst);
527 }
528
529 fn on_end(&self, _span: SpanData) {
530 }
532
533 fn force_flush(&self) -> OTelSdkResult {
534 if self.success {
535 Ok(())
536 } else {
537 Err(OTelSdkError::InternalFailure("cannot export".into()))
538 }
539 }
540
541 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
542 if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
543 Ok(())
544 } else {
545 let _ = self.assert_info.0.is_shutdown.compare_exchange(
546 false,
547 true,
548 Ordering::SeqCst,
549 Ordering::SeqCst,
550 );
551 self.force_flush()
552 }
553 }
554 }
555
556 #[test]
557 fn test_force_flush() {
558 let tracer_provider = super::SdkTracerProvider::new(TracerProviderInner {
559 processors: vec![
560 Box::from(TestSpanProcessor::new(true)),
561 Box::from(TestSpanProcessor::new(false)),
562 ],
563 config: Default::default(),
564 is_shutdown: AtomicBool::new(false),
565 });
566
567 let results = tracer_provider.force_flush();
568 assert!(results.is_err());
569 }
570
571 #[test]
572 fn test_tracer_provider_default_resource() {
573 let assert_resource = |provider: &super::SdkTracerProvider,
574 resource_key: &'static str,
575 expect: Option<&'static str>| {
576 assert_eq!(
577 provider
578 .config()
579 .resource
580 .get(&Key::from_static_str(resource_key))
581 .map(|v| v.to_string()),
582 expect.map(|s| s.to_string())
583 );
584 };
585 let assert_telemetry_resource = |provider: &super::SdkTracerProvider| {
586 assert_eq!(
587 provider
588 .config()
589 .resource
590 .get(&TELEMETRY_SDK_LANGUAGE.into()),
591 Some(Value::from("rust"))
592 );
593 assert_eq!(
594 provider.config().resource.get(&TELEMETRY_SDK_NAME.into()),
595 Some(Value::from("opentelemetry"))
596 );
597 assert_eq!(
598 provider
599 .config()
600 .resource
601 .get(&TELEMETRY_SDK_VERSION.into()),
602 Some(Value::from(env!("CARGO_PKG_VERSION")))
603 );
604 };
605
606 temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
608 let default_config_provider = super::SdkTracerProvider::builder().build();
609 assert_resource(
610 &default_config_provider,
611 SERVICE_NAME,
612 Some("unknown_service"),
613 );
614 assert_telemetry_resource(&default_config_provider);
615 });
616
617 let custom_config_provider = super::SdkTracerProvider::builder()
619 .with_resource(
620 Resource::builder_empty()
621 .with_service_name("test_service")
622 .build(),
623 )
624 .build();
625 assert_resource(&custom_config_provider, SERVICE_NAME, Some("test_service"));
626 assert_eq!(custom_config_provider.config().resource.len(), 1);
627
628 temp_env::with_var(
630 "OTEL_RESOURCE_ATTRIBUTES",
631 Some("key1=value1, k2, k3=value2"),
632 || {
633 let env_resource_provider = super::SdkTracerProvider::builder().build();
634 assert_resource(
635 &env_resource_provider,
636 SERVICE_NAME,
637 Some("unknown_service"),
638 );
639 assert_resource(&env_resource_provider, "key1", Some("value1"));
640 assert_resource(&env_resource_provider, "k3", Some("value2"));
641 assert_telemetry_resource(&env_resource_provider);
642 assert_eq!(env_resource_provider.config().resource.len(), 6);
643 },
644 );
645
646 temp_env::with_var(
648 "OTEL_RESOURCE_ATTRIBUTES",
649 Some("my-custom-key=env-val,k2=value2"),
650 || {
651 let user_provided_resource_config_provider = super::SdkTracerProvider::builder()
652 .with_resource(
653 Resource::builder()
654 .with_attributes([
655 KeyValue::new("my-custom-key", "my-custom-value"),
656 KeyValue::new("my-custom-key2", "my-custom-value2"),
657 ])
658 .build(),
659 )
660 .build();
661 assert_resource(
662 &user_provided_resource_config_provider,
663 SERVICE_NAME,
664 Some("unknown_service"),
665 );
666 assert_resource(
667 &user_provided_resource_config_provider,
668 "my-custom-key",
669 Some("my-custom-value"),
670 );
671 assert_resource(
672 &user_provided_resource_config_provider,
673 "my-custom-key2",
674 Some("my-custom-value2"),
675 );
676 assert_resource(
677 &user_provided_resource_config_provider,
678 "k2",
679 Some("value2"),
680 );
681 assert_telemetry_resource(&user_provided_resource_config_provider);
682 assert_eq!(
683 user_provided_resource_config_provider
684 .config()
685 .resource
686 .len(),
687 7
688 );
689 },
690 );
691
692 let no_service_name = super::SdkTracerProvider::builder()
694 .with_resource(Resource::empty())
695 .build();
696
697 assert_eq!(no_service_name.config().resource.len(), 0)
698 }
699
700 #[test]
701 fn test_shutdown_noops() {
702 let processor = TestSpanProcessor::new(false);
703 let assert_handle = processor.assert_info();
704 let tracer_provider = super::SdkTracerProvider::new(TracerProviderInner {
705 processors: vec![Box::from(processor)],
706 config: Default::default(),
707 is_shutdown: AtomicBool::new(false),
708 });
709
710 let test_tracer_1 = tracer_provider.tracer("test1");
711 let _ = test_tracer_1.start("test");
712
713 assert!(assert_handle.started_span_count(1));
714
715 let _ = test_tracer_1.start("test");
716
717 assert!(assert_handle.started_span_count(2));
718
719 let shutdown = |tracer_provider: super::SdkTracerProvider| {
720 let _ = tracer_provider.shutdown(); };
722
723 shutdown(tracer_provider.clone());
725
726 let noop_tracer = tracer_provider.tracer("noop");
728
729 let _ = noop_tracer.start("test");
731 assert!(assert_handle.started_span_count(2));
732 assert!(noop_tracer.provider().is_shutdown());
734
735 let _ = test_tracer_1.start("test");
737 assert!(assert_handle.started_span_count(2));
738
739 assert!(test_tracer_1.provider().is_shutdown());
741 }
742
743 #[test]
744 fn with_resource_multiple_calls_ensure_additive() {
745 let resource = SdkTracerProvider::builder()
746 .with_resource(Resource::new(vec![KeyValue::new("key1", "value1")]))
747 .with_resource(Resource::new(vec![KeyValue::new("key2", "value2")]))
748 .with_resource(
749 Resource::builder_empty()
750 .with_schema_url(vec![], "http://example.com")
751 .build(),
752 )
753 .with_resource(Resource::new(vec![KeyValue::new("key3", "value3")]))
754 .build()
755 .inner
756 .config
757 .resource
758 .clone()
759 .into_owned();
760
761 assert_eq!(
762 resource.get(&Key::from_static_str("key1")),
763 Some(Value::from("value1"))
764 );
765 assert_eq!(
766 resource.get(&Key::from_static_str("key2")),
767 Some(Value::from("value2"))
768 );
769 assert_eq!(
770 resource.get(&Key::from_static_str("key3")),
771 Some(Value::from("value3"))
772 );
773 assert_eq!(resource.schema_url(), Some("http://example.com"));
774 }
775
776 #[derive(Debug)]
777 struct CountingShutdownProcessor {
778 shutdown_count: Arc<AtomicU32>,
779 }
780
781 impl CountingShutdownProcessor {
782 fn new(shutdown_count: Arc<AtomicU32>) -> Self {
783 CountingShutdownProcessor { shutdown_count }
784 }
785 }
786
787 impl SpanProcessor for CountingShutdownProcessor {
788 fn on_start(&self, _span: &mut Span, _cx: &Context) {
789 }
791
792 fn on_end(&self, _span: SpanData) {
793 }
795
796 fn force_flush(&self) -> OTelSdkResult {
797 Ok(())
798 }
799
800 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
801 self.shutdown_count.fetch_add(1, Ordering::SeqCst);
802 Ok(())
803 }
804 }
805
806 #[test]
807 fn drop_test_with_multiple_providers() {
808 let shutdown_count = Arc::new(AtomicU32::new(0));
809
810 {
811 let shared_inner = Arc::new(TracerProviderInner {
813 processors: vec![Box::new(CountingShutdownProcessor::new(
814 shutdown_count.clone(),
815 ))],
816 config: Config::default(),
817 is_shutdown: AtomicBool::new(false),
818 });
819
820 {
821 let tracer_provider1 = super::SdkTracerProvider {
822 inner: shared_inner.clone(),
823 };
824 let tracer_provider2 = super::SdkTracerProvider {
825 inner: shared_inner.clone(),
826 };
827
828 let tracer1 = tracer_provider1.tracer("test-tracer1");
829 let tracer2 = tracer_provider2.tracer("test-tracer2");
830
831 let _span1 = tracer1.start("span1");
832 let _span2 = tracer2.start("span2");
833
834 }
837 assert_eq!(shutdown_count.load(Ordering::SeqCst), 0);
840 }
841 assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
843 }
844
845 #[test]
846 fn drop_after_shutdown_test_with_multiple_providers() {
847 let shutdown_count = Arc::new(AtomicU32::new(0));
848
849 let shared_inner = Arc::new(TracerProviderInner {
851 processors: vec![Box::new(CountingShutdownProcessor::new(
852 shutdown_count.clone(),
853 ))],
854 config: Config::default(),
855 is_shutdown: AtomicBool::new(false),
856 });
857
858 {
860 let tracer_provider1 = super::SdkTracerProvider {
861 inner: shared_inner.clone(),
862 };
863 let tracer_provider2 = super::SdkTracerProvider {
864 inner: shared_inner.clone(),
865 };
866
867 let shutdown_result = tracer_provider1.shutdown();
869 assert!(shutdown_result.is_ok());
870
871 assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
873
874 let shutdown_result2 = tracer_provider2.shutdown();
876 assert!(shutdown_result2.is_err());
877 assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
878
879 }
881
882 assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
884 }
885}