pub mod connection;
pub mod context;
pub mod http_instrumentation;
pub mod json_serializer;
pub mod log_exporter;
pub mod metrics_exporter;
pub mod otel_worker_gauges;
pub mod span_exporter;
pub mod types;
pub mod worker_metrics;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use opentelemetry::propagation::TextMapCompositePropagator;
use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
use opentelemetry::{Context as OtelContext, KeyValue};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
use opentelemetry_sdk::trace::SdkTracerProvider;
use tokio::sync::Mutex;
use self::connection::SharedEngineConnection;
use self::log_exporter::EngineLogExporter;
use self::metrics_exporter::EngineMetricsExporter;
use self::span_exporter::EngineSpanExporter;
use self::types::OtelConfig;
struct OtelState {
tracer_provider: SdkTracerProvider,
meter_provider: Option<SdkMeterProvider>,
logger_provider: Option<SdkLoggerProvider>,
connection: Arc<SharedEngineConnection>,
shutdown_timeout: std::time::Duration,
}
static OTEL_STATE: OnceLock<Mutex<Option<OtelState>>> = OnceLock::new();
static OTEL_INITIALIZED: AtomicBool = AtomicBool::new(false);
fn get_otel_lock() -> &'static Mutex<Option<OtelState>> {
OTEL_STATE.get_or_init(|| Mutex::new(None))
}
pub async fn init_otel(config: OtelConfig) {
let lock = get_otel_lock();
let mut state = lock.lock().await;
if state.is_some() {
tracing::warn!("OpenTelemetry already initialized, skipping");
return;
}
let enabled = config.enabled.unwrap_or_else(|| {
std::env::var("OTEL_ENABLED")
.map(|v| v == "true" || v == "1")
.unwrap_or(true)
});
if !enabled {
tracing::debug!("OpenTelemetry disabled, skipping initialization");
return;
}
let service_name = config
.service_name
.or_else(|| std::env::var("OTEL_SERVICE_NAME").ok())
.unwrap_or_else(|| "iii-rust-sdk".to_string());
let service_version = config
.service_version
.or_else(|| std::env::var("SERVICE_VERSION").ok())
.unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string());
let service_instance_id = config
.service_instance_id
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let ws_url = config
.engine_ws_url
.or_else(|| std::env::var("III_BRIDGE_URL").ok())
.unwrap_or_else(|| "ws://localhost:49134".to_string());
let reconnection_config = config.reconnection_config.unwrap_or_default();
let mut resource_attrs = vec![
KeyValue::new("service.name", service_name.clone()),
KeyValue::new("service.version", service_version),
KeyValue::new("service.instance.id", service_instance_id),
KeyValue::new("telemetry.sdk.name", "iii-rust-sdk"),
KeyValue::new("telemetry.sdk.language", "rust"),
KeyValue::new("telemetry.sdk.version", env!("CARGO_PKG_VERSION")),
];
if let Some(ns) = config.service_namespace {
resource_attrs.push(KeyValue::new("service.namespace", ns));
}
let resource = Resource::builder().with_attributes(resource_attrs).build();
let channel_capacity = config.channel_capacity.unwrap_or(10_000);
let connection = Arc::new(SharedEngineConnection::with_channel_capacity(
ws_url,
reconnection_config,
channel_capacity,
));
let propagator = TextMapCompositePropagator::new(vec![
Box::new(TraceContextPropagator::new()),
Box::new(BaggagePropagator::new()),
]);
opentelemetry::global::set_text_map_propagator(propagator);
let span_exporter = EngineSpanExporter::new(connection.clone());
let tracer_provider = SdkTracerProvider::builder()
.with_resource(resource.clone())
.with_batch_exporter(span_exporter)
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let meter_provider = if config.metrics_enabled.unwrap_or(true) {
let metrics_exporter = EngineMetricsExporter::new(connection.clone());
let interval_ms = config.metrics_export_interval_ms.unwrap_or(60_000);
let reader = PeriodicReader::builder(metrics_exporter)
.with_interval(std::time::Duration::from_millis(interval_ms))
.build();
let provider = SdkMeterProvider::builder()
.with_resource(resource.clone())
.with_reader(reader)
.build();
opentelemetry::global::set_meter_provider(provider.clone());
Some(provider)
} else {
None
};
let logger_provider = if config.logs_enabled.unwrap_or(true) {
let log_exporter = EngineLogExporter::new(connection.clone());
Some(
SdkLoggerProvider::builder()
.with_resource(resource)
.with_batch_exporter(log_exporter)
.build(),
)
} else {
None
};
let shutdown_timeout =
std::time::Duration::from_millis(config.shutdown_timeout_ms.unwrap_or(10_000));
let otel_state = OtelState {
tracer_provider,
meter_provider,
logger_provider,
connection,
shutdown_timeout,
};
*state = Some(otel_state);
OTEL_INITIALIZED.store(true, Ordering::Release);
tracing::info!(service = %service_name, "OpenTelemetry initialized");
}
pub async fn shutdown_otel() {
let lock = get_otel_lock();
let mut state = lock.lock().await;
if let Some(otel) = state.take() {
OTEL_INITIALIZED.store(false, Ordering::Release);
let timeout_duration = otel.shutdown_timeout;
match tokio::time::timeout(timeout_duration, async {
if let Err(e) = otel.tracer_provider.shutdown() {
tracing::warn!(error = %e, "Error shutting down tracer provider");
}
if let Some(meter) = otel.meter_provider {
if let Err(e) = meter.shutdown() {
tracing::warn!(error = %e, "Error shutting down meter provider");
}
}
if let Some(logger) = otel.logger_provider {
if let Err(e) = logger.shutdown() {
tracing::warn!(error = %e, "Error shutting down logger provider");
}
}
otel.connection.shutdown().await;
})
.await
{
Ok(()) => {
tracing::info!("OpenTelemetry shut down");
}
Err(_) => {
tracing::warn!(
timeout_ms = timeout_duration.as_millis() as u64,
"OpenTelemetry shutdown timed out"
);
}
}
}
}
pub async fn flush_otel() {
let lock = get_otel_lock();
let state = lock.lock().await;
if let Some(otel) = state.as_ref() {
if let Err(e) = otel.tracer_provider.force_flush() {
tracing::warn!(error = %e, "Error flushing tracer provider");
}
if let Some(meter) = &otel.meter_provider {
if let Err(e) = meter.force_flush() {
tracing::warn!(error = %e, "Error flushing meter provider");
}
}
otel.connection.flush().await;
}
}
pub async fn with_span<F, Fut, T>(
name: &str,
traceparent: Option<&str>,
kind: Option<SpanKind>,
f: F,
) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
{
use opentelemetry::trace::FutureExt as OtelFutureExt;
let tracer = opentelemetry::global::tracer("iii-rust-sdk");
let parent_cx = match traceparent {
Some(tp) => context::extract_traceparent(tp),
None => OtelContext::current(),
};
let span_kind = kind.unwrap_or(SpanKind::Internal);
let span = tracer
.span_builder(name.to_string())
.with_kind(span_kind)
.start_with_context(&tracer, &parent_cx);
let cx = parent_cx.with_span(span);
match f().with_context(cx.clone()).await {
Ok(result) => {
cx.span().set_status(Status::Ok);
Ok(result)
}
Err(err) => {
let span = cx.span();
span.set_status(Status::error(err.to_string()));
span.record_error(&*err);
Err(err)
}
}
}
pub fn get_tracer() -> opentelemetry::global::BoxedTracer {
opentelemetry::global::tracer("iii-rust-sdk")
}
pub fn get_meter() -> opentelemetry::metrics::Meter {
opentelemetry::global::meter("iii-rust-sdk")
}
pub fn is_initialized() -> bool {
OTEL_INITIALIZED.load(Ordering::Acquire)
}
pub fn get_logger_provider() -> Option<SdkLoggerProvider> {
if !is_initialized() {
return None;
}
let lock = get_otel_lock();
let state = lock.try_lock().ok()?;
state.as_ref()?.logger_provider.clone()
}