#[cfg(feature = "otlp-tracing")]
use opentelemetry::{KeyValue, global, trace::TracerProvider};
#[cfg(feature = "otlp-tracing")]
use opentelemetry_sdk::trace::SdkTracerProvider;
use std::sync::{Arc, Mutex};
use tracing::warn;
pub mod convenience;
pub mod init;
pub mod logs;
pub mod metrics;
pub mod spans;
pub use crate::config::{
LogConfig, LogDestination, MetricsConfig, MetricsDestination, ObservabilityConfig,
ResourceConfig, SamplingConfig, TracingConfig, TracingDestination,
};
pub use logs::init_logging;
#[cfg(not(target_arch = "wasm32"))]
use tracing_appender::non_blocking::WorkerGuard;
static CONFIG: Mutex<Option<ObservabilityConfig>> = Mutex::new(None);
#[cfg(not(target_arch = "wasm32"))]
static LOG_WORKER_GUARD: Mutex<Option<WorkerGuard>> = Mutex::new(None);
static TEST_METRICS_RECORDER_HANDLE: Mutex<Option<Arc<Mutex<Vec<metrics::CapturedMetric>>>>> =
Mutex::new(None);
#[allow(clippy::type_complexity)]
pub fn init_observability(
config: ObservabilityConfig,
) -> Result<Option<Arc<Mutex<Vec<metrics::CapturedMetric>>>>, crate::error::JacsError> {
if let Ok(mut stored_config) = CONFIG.lock() {
*stored_config = Some(config.clone());
} else {
return Err("CONFIG lock poisoned".into());
}
if let Some(tracing_config) = &config.tracing
&& tracing_config.enabled
{
#[cfg(feature = "otlp-tracing")]
{
match init_tracing(tracing_config) {
Ok(_) => {}
Err(e) => {
warn!(
"Info: init_tracing reported: {} (possibly already initialized)",
e
);
}
}
}
#[cfg(not(feature = "otlp-tracing"))]
{
return Err(
"otlp-tracing feature is not enabled; rebuild with --features otlp-tracing".into(),
);
}
}
match logs::init_logs(&config.logs) {
Ok(guard_option) => {
#[cfg(not(target_arch = "wasm32"))]
if let Some(new_guard) = guard_option
&& let Ok(mut global_guard_handle) = LOG_WORKER_GUARD.lock()
{
if let Some(old_guard) = global_guard_handle.take() {
drop(old_guard);
}
*global_guard_handle = Some(new_guard);
}
}
Err(e) => {
warn!(
"Info: logs::init_logs reported: {} (possibly already initialized)",
e
);
}
}
let mut metrics_handle_for_return: Option<Arc<Mutex<Vec<metrics::CapturedMetric>>>> = None;
match metrics::init_metrics(&config.metrics) {
Ok((captured_arc_option, _meter_provider)) => {
if let Ok(mut global_metrics_handle) = TEST_METRICS_RECORDER_HANDLE.lock() {
*global_metrics_handle = captured_arc_option.clone();
metrics_handle_for_return = captured_arc_option;
}
}
Err(e) => {
warn!(
"Info: metrics::init_metrics reported: {} (possibly already initialized)",
e
);
}
}
Ok(metrics_handle_for_return)
}
pub fn get_config() -> Option<ObservabilityConfig> {
CONFIG.lock().ok()?.clone()
}
pub fn reset_observability() {
if let Ok(mut config_handle) = CONFIG.lock() {
*config_handle = None;
}
if let Ok(handle_option) = TEST_METRICS_RECORDER_HANDLE.lock()
&& let Some(arc) = handle_option.as_ref()
&& let Ok(mut captured_metrics_vec) = arc.lock()
{
captured_metrics_vec.clear();
}
#[cfg(not(target_arch = "wasm32"))]
{
if let Ok(mut guard_opt_handle) = LOG_WORKER_GUARD.lock()
&& let Some(guard) = guard_opt_handle.take()
{
drop(guard); }
}
}
pub fn force_reset_for_tests() {
reset_observability();
if let Ok(mut handle) = TEST_METRICS_RECORDER_HANDLE.lock() {
*handle = None;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
pub fn flush_observability() {
std::thread::sleep(std::time::Duration::from_millis(50));
}
#[cfg(all(not(target_arch = "wasm32"), feature = "otlp-tracing"))]
fn init_tracing(config: &TracingConfig) -> Result<(), crate::error::JacsError> {
use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
use opentelemetry_sdk::{
Resource,
trace::{Sampler, SdkTracerProvider},
};
use tracing_subscriber::{Registry, layer::SubscriberExt};
let endpoint = config
.destination
.as_ref()
.map(|dest| match dest {
crate::config::TracingDestination::Otlp { endpoint, .. } => {
if endpoint.ends_with("/v1/traces") {
endpoint.clone()
} else if endpoint.ends_with("/") {
format!("{}v1/traces", endpoint)
} else {
format!("{}/v1/traces", endpoint)
}
}
crate::config::TracingDestination::Jaeger { endpoint, .. } => endpoint.clone(),
})
.unwrap_or_else(|| "http://localhost:4318/v1/traces".to_string());
println!("DEBUG: Using OTLP endpoint: {}", endpoint);
let exporter = SpanExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint(endpoint)
.build()
.map_err(|e| crate::error::JacsError::ConfigError(e.to_string()))?;
println!("DEBUG: SpanExporter built successfully with blocking client");
let service_name = config
.resource
.as_ref()
.map(|r| r.service_name.clone())
.unwrap_or_else(|| "jacs-demo".to_string());
let mut resource_builder = Resource::builder().with_service_name(service_name.clone());
if let Some(resource_config) = &config.resource {
if let Some(version) = &resource_config.service_version {
resource_builder =
resource_builder.with_attribute(KeyValue::new("service.version", version.clone()));
}
if let Some(env) = &resource_config.environment {
resource_builder =
resource_builder.with_attribute(KeyValue::new("environment", env.clone()));
}
for (k, v) in &resource_config.attributes {
resource_builder = resource_builder.with_attribute(KeyValue::new(k.clone(), v.clone()));
}
}
let resource = resource_builder.build();
let sampler = if config.sampling.ratio < 1.0 {
Sampler::TraceIdRatioBased(config.sampling.ratio)
} else {
Sampler::AlwaysOn
};
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(resource)
.with_sampler(sampler)
.build();
let tracer = provider.tracer(service_name);
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default()
.with(telemetry)
.with(tracing_subscriber::fmt::layer());
tracing::subscriber::set_global_default(subscriber)
.map_err(|e| crate::error::JacsError::ConfigError(e.to_string()))?;
global::set_tracer_provider(provider);
println!("DEBUG: OpenTelemetry tracing initialized with blocking HTTP client");
Ok(())
}