#[cfg(feature = "std")]
mod json_exporter;
#[cfg(feature = "std")]
mod test_exporter;
use core::fmt::Debug;
#[cfg(feature = "enable")]
use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use core::{error, fmt};
#[cfg(feature = "std")]
pub use json_exporter::ConsoleJsonExporter;
#[cfg(feature = "std")]
#[doc(hidden)]
pub use test_exporter::TestExporter;
use crate::TraceId;
#[cfg(feature = "enable")]
use crate::protocol::ExecutionId;
use crate::protocol::InstanceMessage;
#[cfg(feature = "enable")]
use crate::protocol::{
LogMessage, SpanAddEventMessage, SpanAddLinkMessage, SpanCloseMessage, SpanCreateMessage,
SpanEnterMessage, SpanExitMessage, SpanSetAttributeMessage, TelemetryMessage, TracingMessage,
};
pub trait Export: Debug {
fn export(&self, message: InstanceMessage<'_>);
}
#[derive(Debug)]
pub struct Collector {
#[cfg(feature = "enable")]
inner: CollectorInner,
}
#[cfg(feature = "enable")]
#[derive(Debug)]
struct CollectorInner {
execution_id: ExecutionId,
exporter: &'static (dyn Export + Sync),
trace_id_prefix: u64,
trace_id_counter: AtomicU64,
}
#[cfg(feature = "enable")]
#[derive(Debug)]
struct NopExporter;
#[cfg(feature = "enable")]
impl Export for NopExporter {
fn export(&self, _: InstanceMessage) {}
}
#[cfg(feature = "enable")]
static mut GLOBAL_COLLECTOR: Collector = Collector {
inner: CollectorInner {
execution_id: ExecutionId::from_raw(0),
exporter: &NO_EXPORTER,
trace_id_prefix: 0,
trace_id_counter: AtomicU64::new(0),
},
};
static NO_COLLECTOR: Collector = Collector {
#[cfg(feature = "enable")]
inner: CollectorInner {
execution_id: ExecutionId::from_raw(0),
exporter: &NO_EXPORTER,
trace_id_prefix: 0,
trace_id_counter: AtomicU64::new(0),
},
};
#[cfg(feature = "enable")]
static NO_EXPORTER: NopExporter = NopExporter;
#[cfg(feature = "enable")]
static GLOBAL_INIT: AtomicUsize = AtomicUsize::new(0);
#[cfg(feature = "enable")]
const UNINITIALIZED: usize = 0;
#[cfg(feature = "enable")]
const INITIALIZING: usize = 1;
#[cfg(feature = "enable")]
const INITIALIZED: usize = 2;
#[cfg(feature = "enable")]
pub fn set_exporter(
execution_id: ExecutionId,
exporter: &'static (dyn Export + Sync),
) -> Result<(), SetExporterError> {
if GLOBAL_INIT
.compare_exchange(
UNINITIALIZED,
INITIALIZING,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
unsafe { GLOBAL_COLLECTOR = Collector::new(execution_id, exporter) }
GLOBAL_INIT.store(INITIALIZED, Ordering::Release);
Ok(())
} else {
Err(SetExporterError(()))
}
}
pub fn get_collector() -> &'static Collector {
#[cfg(not(feature = "enable"))]
{
&NO_COLLECTOR
}
#[cfg(feature = "enable")]
if GLOBAL_INIT.load(Ordering::Acquire) != INITIALIZED {
&NO_COLLECTOR
} else {
unsafe {
#[expect(clippy::deref_addrof, reason = "false positive")]
&*&raw const GLOBAL_COLLECTOR
}
}
}
#[derive(Debug)]
pub struct SetExporterError(());
impl SetExporterError {
const MESSAGE: &'static str = "a global exporter has already been set";
}
impl fmt::Display for SetExporterError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str(Self::MESSAGE)
}
}
impl error::Error for SetExporterError {}
impl Collector {
#[cfg(feature = "enable")]
fn new(execution_id: ExecutionId, exporter: &'static (dyn Export + Sync)) -> Self {
let execution_id_raw = *execution_id;
let trace_id_prefix = (execution_id_raw >> 64) as u64;
let initial_counter_value = execution_id_raw as u64;
Self {
inner: CollectorInner {
execution_id,
exporter,
trace_id_prefix,
trace_id_counter: AtomicU64::new(initial_counter_value),
},
}
}
#[inline]
pub(crate) fn generate_trace_id(&self) -> TraceId {
#[cfg(not(feature = "enable"))]
{
TraceId(0)
}
#[cfg(feature = "enable")]
if self.inner.trace_id_prefix == 0 {
TraceId(0)
} else {
let suffix = self.inner.trace_id_counter.fetch_add(1, Ordering::Relaxed);
TraceId(((self.inner.trace_id_prefix as u128) << 32) | (suffix as u128))
}
}
}
#[cfg(feature = "enable")]
impl Collector {
#[inline]
pub fn collect_external(&self, message: InstanceMessage<'_>) {
self.inner.exporter.export(message);
}
#[inline]
pub(crate) fn new_span(&self, span: SpanCreateMessage<'_>) {
self.tracing_message(TracingMessage::CreateSpan(span));
}
#[inline]
pub(crate) fn enter_span(&self, enter: SpanEnterMessage) {
self.tracing_message(TracingMessage::EnterSpan(enter));
}
#[inline]
pub(crate) fn exit_span(&self, exit: SpanExitMessage) {
self.tracing_message(TracingMessage::ExitSpan(exit));
}
#[inline]
pub(crate) fn close_span(&self, span: SpanCloseMessage) {
self.tracing_message(TracingMessage::CloseSpan(span));
}
#[inline]
pub(crate) fn span_event(&self, event: SpanAddEventMessage<'_>) {
self.tracing_message(TracingMessage::AddEvent(event));
}
#[inline]
pub(crate) fn span_link(&self, link: SpanAddLinkMessage) {
self.tracing_message(TracingMessage::AddLink(link));
}
#[inline]
pub(crate) fn span_attribute(&self, attribute: SpanSetAttributeMessage<'_>) {
self.tracing_message(TracingMessage::SetAttribute(attribute));
}
#[inline]
pub(crate) fn log_message(&self, log: LogMessage<'_>) {
self.inner.exporter.export(InstanceMessage {
execution: self.inner.execution_id,
message: TelemetryMessage::Log(log),
});
}
#[inline]
fn tracing_message(&self, message: TracingMessage<'_>) {
self.inner.exporter.export(InstanceMessage {
execution: self.inner.execution_id,
message: TelemetryMessage::Tracing(message),
});
}
}