use crate::trace::NoopTracerProvider;
use crate::{trace, trace::TracerProvider, Context, KeyValue};
use std::fmt;
use std::mem;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
#[derive(Debug)]
pub struct BoxedSpan(Box<DynSpan>);
type DynSpan = dyn trace::Span + Send + Sync;
impl trace::Span for BoxedSpan {
fn add_event_with_timestamp(
&self,
name: String,
timestamp: SystemTime,
attributes: Vec<KeyValue>,
) {
self.0.add_event_with_timestamp(name, timestamp, attributes)
}
fn span_context(&self) -> &trace::SpanContext {
self.0.span_context()
}
fn is_recording(&self) -> bool {
self.0.is_recording()
}
fn set_attribute(&self, attribute: KeyValue) {
self.0.set_attribute(attribute)
}
fn set_status(&self, code: trace::StatusCode, message: String) {
self.0.set_status(code, message)
}
fn update_name(&self, new_name: String) {
self.0.update_name(new_name)
}
fn end_with_timestamp(&self, timestamp: SystemTime) {
self.0.end_with_timestamp(timestamp);
}
}
#[derive(Debug)]
pub struct BoxedTracer(Box<dyn GenericTracer + Send + Sync>);
impl trace::Tracer for BoxedTracer {
type Span = BoxedSpan;
fn invalid(&self) -> Self::Span {
BoxedSpan(self.0.invalid_boxed())
}
fn start_with_context(&self, name: &str, cx: Context) -> Self::Span {
BoxedSpan(self.0.start_with_context_boxed(name, cx))
}
fn span_builder(&self, name: &str) -> trace::SpanBuilder {
trace::SpanBuilder::from_name(name.to_string())
}
fn build(&self, builder: trace::SpanBuilder) -> Self::Span {
BoxedSpan(self.0.build_boxed(builder))
}
}
pub trait GenericTracer: fmt::Debug + 'static {
fn invalid_boxed(&self) -> Box<DynSpan>;
fn start_with_context_boxed(&self, name: &str, cx: Context) -> Box<DynSpan>;
fn build_boxed(&self, builder: trace::SpanBuilder) -> Box<DynSpan>;
}
impl<S, T> GenericTracer for T
where
S: trace::Span + Send + Sync,
T: trace::Tracer<Span = S>,
{
fn invalid_boxed(&self) -> Box<DynSpan> {
Box::new(self.invalid())
}
fn start_with_context_boxed(&self, name: &str, cx: Context) -> Box<DynSpan> {
Box::new(self.start_with_context(name, cx))
}
fn build_boxed(&self, builder: trace::SpanBuilder) -> Box<DynSpan> {
Box::new(self.build(builder))
}
}
pub trait GenericTracerProvider: fmt::Debug + 'static {
fn get_tracer_boxed(
&self,
name: &'static str,
version: Option<&'static str>,
) -> Box<dyn GenericTracer + Send + Sync>;
}
impl<S, T, P> GenericTracerProvider for P
where
S: trace::Span + Send + Sync,
T: trace::Tracer<Span = S> + Send + Sync,
P: trace::TracerProvider<Tracer = T>,
{
fn get_tracer_boxed(
&self,
name: &'static str,
version: Option<&'static str>,
) -> Box<dyn GenericTracer + Send + Sync> {
Box::new(self.get_tracer(name, version))
}
}
#[derive(Clone, Debug)]
pub struct GlobalTracerProvider {
provider: Arc<dyn GenericTracerProvider + Send + Sync>,
}
impl GlobalTracerProvider {
fn new<P, T, S>(provider: P) -> Self
where
S: trace::Span + Send + Sync,
T: trace::Tracer<Span = S> + Send + Sync,
P: trace::TracerProvider<Tracer = T> + Send + Sync,
{
GlobalTracerProvider {
provider: Arc::new(provider),
}
}
}
impl trace::TracerProvider for GlobalTracerProvider {
type Tracer = BoxedTracer;
fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer {
BoxedTracer(self.provider.get_tracer_boxed(name, version))
}
}
lazy_static::lazy_static! {
static ref GLOBAL_TRACER_PROVIDER: RwLock<GlobalTracerProvider> = RwLock::new(GlobalTracerProvider::new(trace::NoopTracerProvider::new()));
}
pub fn tracer_provider() -> GlobalTracerProvider {
GLOBAL_TRACER_PROVIDER
.read()
.expect("GLOBAL_TRACER_PROVIDER RwLock poisoned")
.clone()
}
pub fn tracer(name: &'static str) -> BoxedTracer {
tracer_provider().get_tracer(name, None)
}
pub fn tracer_with_version(name: &'static str, version: &'static str) -> BoxedTracer {
tracer_provider().get_tracer(name, Some(version))
}
#[must_use]
pub fn set_tracer_provider<P, T, S>(new_provider: P) -> GlobalTracerProvider
where
S: trace::Span + Send + Sync,
T: trace::Tracer<Span = S> + Send + Sync,
P: trace::TracerProvider<Tracer = T> + Send + Sync,
{
let mut tracer_provider = GLOBAL_TRACER_PROVIDER
.write()
.expect("GLOBAL_TRACER_PROVIDER RwLock poisoned");
mem::replace(
&mut *tracer_provider,
GlobalTracerProvider::new(new_provider),
)
}
pub fn shutdown_tracer_provider() {
let mut tracer_provider = GLOBAL_TRACER_PROVIDER
.write()
.expect("GLOBAL_TRACER_PROVIDER RwLock poisoned");
let _ = mem::replace(
&mut *tracer_provider,
GlobalTracerProvider::new(NoopTracerProvider::new()),
);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
runtime::{self, Runtime},
trace::{NoopTracer, Tracer},
};
use std::{
fmt::Debug,
io::Write,
sync::Mutex,
thread::{self, sleep},
time::Duration,
};
#[derive(Debug)]
struct AssertWriter {
buf: Arc<Mutex<Vec<u8>>>,
}
impl AssertWriter {
fn new() -> AssertWriter {
AssertWriter {
buf: Arc::new(Mutex::new(Vec::new())),
}
}
fn len(&self) -> usize {
self.buf
.lock()
.expect("cannot acquire the lock of assert writer")
.len()
}
}
impl Write for AssertWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self
.buf
.lock()
.expect("cannot acquire the lock of assert writer");
buffer.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self
.buf
.lock()
.expect("cannot acquire the lock of assert writer");
buffer.flush()
}
}
impl Clone for AssertWriter {
fn clone(&self) -> Self {
AssertWriter {
buf: self.buf.clone(),
}
}
}
#[derive(Debug)]
struct TestTracerProvider {
debug_msg: &'static str,
}
impl Default for TestTracerProvider {
fn default() -> Self {
TestTracerProvider { debug_msg: "" }
}
}
impl TestTracerProvider {
fn new(debug_msg: &'static str) -> Self {
TestTracerProvider { debug_msg }
}
}
impl TracerProvider for TestTracerProvider {
type Tracer = NoopTracer;
fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer {
NoopTracer::default()
}
}
#[test]
#[ignore = "requires --test-threads=1"]
fn test_set_tracer_provider() {
let _ = set_tracer_provider(TestTracerProvider::new("global one"));
{
let _ = set_tracer_provider(TestTracerProvider::new("inner one"));
assert!(format!("{:?}", tracer_provider()).contains("inner one"));
}
assert!(format!("{:?}", tracer_provider()).contains("inner one"));
}
#[test]
#[ignore = "requires --test-threads=1"]
fn test_set_tracer_provider_in_another_thread() {
let _ = set_tracer_provider(TestTracerProvider::new("global one"));
let handle = thread::spawn(move || {
assert!(format!("{:?}", tracer_provider()).contains("global one"));
});
println!("{:?}", tracer_provider());
let _ = handle.join();
}
#[test]
#[ignore = "requires --test-threads=1"]
fn test_set_tracer_provider_in_another_function() {
let setup = || {
let _ = set_tracer_provider(TestTracerProvider::new("global one"));
assert!(format!("{:?}", tracer_provider()).contains("global one"))
};
setup();
assert!(format!("{:?}", tracer_provider()).contains("global one"))
}
#[test]
#[ignore = "requires --test-threads=1"]
fn test_set_two_provider_in_two_thread() {
let (sender, recv) = std::sync::mpsc::channel();
let (sender1, sender2) = (sender.clone(), sender);
let _handle1 = thread::spawn(move || {
sleep(Duration::from_secs(1));
let _previous = set_tracer_provider(TestTracerProvider::new("thread 1"));
sleep(Duration::from_secs(2));
let _ = sender1.send(format!("thread 1: {:?}", tracer_provider()));
});
let _handle2 = thread::spawn(move || {
sleep(Duration::from_secs(2));
let _previous = set_tracer_provider(TestTracerProvider::new("thread 2"));
sleep(Duration::from_secs(1));
let _ = sender2.send(format!("thread 2 :{:?}", tracer_provider()));
});
let first_resp = recv.recv().unwrap();
let second_resp = recv.recv().unwrap();
assert!(first_resp.contains("thread 2"));
assert!(second_resp.contains("thread 2"));
}
fn build_batch_tracer_provider<R: Runtime>(
assert_writer: AssertWriter,
runtime: R,
) -> crate::sdk::trace::TracerProvider {
use crate::sdk::trace::TracerProvider;
let exporter = crate::sdk::export::trace::stdout::Exporter::new(assert_writer, true);
TracerProvider::builder()
.with_default_batch_exporter(exporter, runtime)
.build()
}
fn build_simple_tracer_provider(
assert_writer: AssertWriter,
) -> crate::sdk::trace::TracerProvider {
use crate::sdk::trace::TracerProvider;
let exporter = crate::sdk::export::trace::stdout::Exporter::new(assert_writer, true);
TracerProvider::builder()
.with_simple_exporter(exporter)
.build()
}
async fn test_set_provider_in_tokio<R: Runtime>(runtime: R) -> AssertWriter {
let buffer = AssertWriter::new();
let _ = set_tracer_provider(build_batch_tracer_provider(buffer.clone(), runtime));
let tracer = tracer("opentelemetery");
tracer.in_span("test", |_cx| {});
buffer
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires --test-threads=1"]
async fn test_set_provider_multiple_thread_tokio() {
let assert_writer = test_set_provider_in_tokio(runtime::Tokio).await;
assert_eq!(assert_writer.len(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires --test-threads=1"]
async fn test_set_provider_multiple_thread_tokio_shutdown() {
let assert_writer = test_set_provider_in_tokio(runtime::Tokio).await;
shutdown_tracer_provider();
assert!(assert_writer.len() > 0);
}
#[tokio::test]
#[ignore = "requires --test-threads=1"]
async fn test_set_provider_single_thread_tokio_with_simple_processor() {
let assert_writer = AssertWriter::new();
let _ = set_tracer_provider(build_simple_tracer_provider(assert_writer.clone()));
let tracer = tracer("opentelemetry");
tracer.in_span("test", |_cx| {});
shutdown_tracer_provider();
assert!(assert_writer.len() > 0);
}
#[tokio::test]
#[ignore = "requires --test-threads=1"]
async fn test_set_provider_single_thread_tokio() {
let assert_writer = test_set_provider_in_tokio(runtime::TokioCurrentThread).await;
assert_eq!(assert_writer.len(), 0)
}
#[tokio::test]
#[ignore = "requires --test-threads=1"]
async fn test_set_provider_single_thread_tokio_shutdown() {
let assert_writer = test_set_provider_in_tokio(runtime::TokioCurrentThread).await;
shutdown_tracer_provider();
assert!(assert_writer.len() > 0);
}
}