#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::runtime::RuntimeChannel;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::trace::SpanExporter;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::{error::OTelSdkResult, runtime};
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use opentelemetry::global::*;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use opentelemetry::trace::Tracer;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use std::fmt::Debug;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use std::sync::Arc;
#[derive(Debug)]
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
struct SpanCountExporter {
span_count: Arc<AtomicUsize>,
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
impl SpanExporter for SpanCountExporter {
async fn export(&self, batch: Vec<crate::trace::SpanData>) -> OTelSdkResult {
self.span_count.fetch_add(batch.len(), Ordering::SeqCst);
Ok(())
}
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
impl SpanCountExporter {
fn new() -> SpanCountExporter {
SpanCountExporter {
span_count: Arc::new(AtomicUsize::new(0)),
}
}
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
fn build_batch_tracer_provider<R: RuntimeChannel>(
exporter: SpanCountExporter,
runtime: R,
) -> crate::trace::SdkTracerProvider {
use crate::trace::SdkTracerProvider;
let processor = crate::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(
exporter, runtime,
)
.build();
SdkTracerProvider::builder()
.with_span_processor(processor)
.build()
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
fn build_simple_tracer_provider(exporter: SpanCountExporter) -> crate::trace::SdkTracerProvider {
use crate::trace::SdkTracerProvider;
SdkTracerProvider::builder()
.with_simple_exporter(exporter)
.build()
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
async fn test_set_provider_in_tokio<R: RuntimeChannel>(
runtime: R,
) -> (Arc<AtomicUsize>, crate::trace::SdkTracerProvider) {
let exporter = SpanCountExporter::new();
let span_count = exporter.span_count.clone();
let tracer_provider = build_batch_tracer_provider(exporter, runtime);
set_tracer_provider(tracer_provider.clone());
let tracer = tracer("opentelemetery");
tracer.in_span("test", |_cx| {});
(span_count, tracer_provider)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio")]
async fn test_set_provider_multiple_thread_tokio() {
let (span_count, _) = test_set_provider_in_tokio(runtime::Tokio).await;
assert_eq!(span_count.load(Ordering::SeqCst), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio")]
async fn test_set_provider_multiple_thread_tokio_shutdown() {
let (span_count, tracer_provider) = test_set_provider_in_tokio(runtime::Tokio).await;
tracer_provider
.shutdown()
.expect("TracerProvider should shutdown properly");
assert!(span_count.load(Ordering::SeqCst) > 0);
}
#[tokio::test]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio")]
async fn test_set_provider_single_thread_tokio_with_simple_processor() {
let exporter = SpanCountExporter::new();
let span_count = exporter.span_count.clone();
let tracer_provider = build_simple_tracer_provider(exporter);
set_tracer_provider(tracer_provider.clone());
let tracer = tracer("opentelemetry");
tracer.in_span("test", |_cx| {});
tracer_provider
.shutdown()
.expect("TracerProvider should shutdown properly");
assert!(span_count.load(Ordering::SeqCst) > 0);
}
#[tokio::test]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio-current-thread")]
async fn test_set_provider_single_thread_tokio() {
let (span_count, _) = test_set_provider_in_tokio(runtime::TokioCurrentThread).await;
assert_eq!(span_count.load(Ordering::SeqCst), 0)
}
#[tokio::test]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio-current-thread")]
async fn test_set_provider_single_thread_tokio_shutdown() {
let (span_count, tracer_provider) =
test_set_provider_in_tokio(runtime::TokioCurrentThread).await;
tracer_provider
.shutdown()
.expect("TracerProvider should shutdown properly");
assert!(span_count.load(Ordering::SeqCst) > 0)
}