use opentelemetry::{
global,
trace::{Span, SpanKind, Status, TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
trace::{RandomIdGenerator, Sampler, SdkTracerProvider as TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions as semconv;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
#[derive(Debug, Error)]
pub enum TracingError {
#[error("OpenTelemetry initialization failed: {0}")]
InitializationError(String),
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Failed to export traces: {0}")]
ExportError(String),
#[error("Tracing error: {0}")]
Other(String),
}
pub type Result<T> = std::result::Result<T, TracingError>;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SamplingStrategy {
AlwaysOn,
AlwaysOff,
TraceIdRatioBased(f64),
ParentBased {
root: Box<SamplingStrategy>,
},
}
impl Default for SamplingStrategy {
fn default() -> Self {
Self::TraceIdRatioBased(0.1) }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TracingConfig {
pub service_name: String,
pub service_version: String,
pub otlp_endpoint: Option<String>,
pub enabled: bool,
pub sampling_strategy: SamplingStrategy,
pub batch_timeout_ms: u64,
pub max_batch_size: usize,
pub export_timeout_ms: u64,
pub console_output: bool,
pub json_format: bool,
pub env_filter: String,
pub resource_attributes: Vec<(String, String)>,
}
impl Default for TracingConfig {
fn default() -> Self {
Self {
service_name: "oxirs-cluster".to_string(),
service_version: env!("CARGO_PKG_VERSION").to_string(),
otlp_endpoint: Some("http://localhost:4317".to_string()),
enabled: true,
sampling_strategy: SamplingStrategy::default(),
batch_timeout_ms: 5000,
max_batch_size: 512,
export_timeout_ms: 30000,
console_output: true,
json_format: false,
env_filter: "info".to_string(),
resource_attributes: Vec::new(),
}
}
}
impl TracingConfig {
pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
self.service_name = name.into();
self
}
pub fn with_otlp_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.otlp_endpoint = Some(endpoint.into());
self
}
pub fn with_sampling_strategy(mut self, strategy: SamplingStrategy) -> Self {
self.sampling_strategy = strategy;
self
}
pub fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
pub fn add_resource_attribute(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.resource_attributes.push((key.into(), value.into()));
self
}
}
pub struct TracingManager {
config: TracingConfig,
tracer_provider: Arc<RwLock<Option<TracerProvider>>>,
initialized: Arc<RwLock<bool>>,
}
impl TracingManager {
pub async fn new(config: TracingConfig) -> Result<Self> {
if !config.enabled {
return Ok(Self {
config,
tracer_provider: Arc::new(RwLock::new(None)),
initialized: Arc::new(RwLock::new(false)),
});
}
Ok(Self {
config,
tracer_provider: Arc::new(RwLock::new(None)),
initialized: Arc::new(RwLock::new(false)),
})
}
#[allow(deprecated)] #[allow(unused_variables)] pub async fn start(&self) -> Result<()> {
if !self.config.enabled {
tracing::info!("Distributed tracing is disabled");
return Ok(());
}
let mut initialized = self.initialized.write().await;
if *initialized {
return Ok(());
}
let mut all_attrs = vec![KeyValue::new(
semconv::resource::SERVICE_VERSION,
self.config.service_version.clone(),
)];
for (key, value) in &self.config.resource_attributes {
all_attrs.push(KeyValue::new(key.clone(), value.clone()));
}
let resource = Resource::builder_empty()
.with_service_name(self.config.service_name.clone())
.with_attributes(all_attrs)
.build();
let sampler = self.create_sampler(&self.config.sampling_strategy);
let provider = if let Some(otlp_endpoint) = &self.config.otlp_endpoint {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(otlp_endpoint.clone())
.with_timeout(Duration::from_millis(self.config.export_timeout_ms))
.build()
.map_err(|e| {
TracingError::InitializationError(format!(
"Failed to create OTLP exporter: {e}"
))
})?;
TracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(resource)
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.build()
} else {
TracerProvider::builder()
.with_resource(resource)
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.build()
};
global::set_tracer_provider(provider.clone());
let mut tracer_provider = self.tracer_provider.write().await;
*tracer_provider = Some(provider);
self.init_tracing_subscriber()?;
*initialized = true;
tracing::info!(
service_name = %self.config.service_name,
otlp_endpoint = ?self.config.otlp_endpoint,
"Distributed tracing initialized"
);
Ok(())
}
pub async fn stop(&self) -> Result<()> {
if !*self.initialized.read().await {
return Ok(());
}
tracing::info!("Shutting down distributed tracing");
let mut provider = self.tracer_provider.write().await;
if let Some(provider) = provider.take() {
if let Err(e) = provider.shutdown() {
tracing::error!("Failed to shutdown tracer provider: {}", e);
}
}
let mut initialized = self.initialized.write().await;
*initialized = false;
Ok(())
}
pub async fn is_initialized(&self) -> bool {
*self.initialized.read().await
}
pub fn config(&self) -> &TracingConfig {
&self.config
}
fn create_sampler(&self, strategy: &SamplingStrategy) -> Sampler {
match strategy {
SamplingStrategy::AlwaysOn => Sampler::AlwaysOn,
SamplingStrategy::AlwaysOff => Sampler::AlwaysOff,
SamplingStrategy::TraceIdRatioBased(ratio) => Sampler::TraceIdRatioBased(*ratio),
SamplingStrategy::ParentBased { root } => {
let root_sampler = self.create_sampler(root);
Sampler::ParentBased(Box::new(root_sampler))
}
}
}
fn init_tracing_subscriber(&self) -> Result<()> {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(&self.config.env_filter));
if self.config.console_output {
if self.config.json_format {
let json_layer = tracing_subscriber::fmt::layer().json();
tracing_subscriber::registry()
.with(env_filter)
.with(json_layer)
.try_init()
.map_err(|e| {
TracingError::InitializationError(format!(
"Failed to initialize subscriber: {e}"
))
})?;
} else {
let fmt_layer = tracing_subscriber::fmt::layer();
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.try_init()
.map_err(|e| {
TracingError::InitializationError(format!(
"Failed to initialize subscriber: {e}"
))
})?;
}
} else {
tracing_subscriber::registry()
.with(env_filter)
.try_init()
.map_err(|e| {
TracingError::InitializationError(format!(
"Failed to initialize subscriber: {e}"
))
})?;
}
tracing::warn!(
"OpenTelemetry integration temporarily disabled due to version compatibility"
);
Ok(())
}
}
pub mod span_helpers {
use super::*;
pub fn consensus_span(operation: &str, node_id: u64, term: u64) -> impl Span {
let tracer = global::tracer("oxirs-cluster");
let mut span = tracer
.span_builder(format!("consensus.{}", operation))
.with_kind(SpanKind::Internal)
.start(&tracer);
span.set_attribute(KeyValue::new("node.id", node_id as i64));
span.set_attribute(KeyValue::new("consensus.term", term as i64));
span.set_attribute(KeyValue::new("operation.type", "consensus"));
span
}
pub fn replication_span(operation: &str, source_node: u64, target_node: u64) -> impl Span {
let tracer = global::tracer("oxirs-cluster");
let mut span = tracer
.span_builder(format!("replication.{}", operation))
.with_kind(SpanKind::Internal)
.start(&tracer);
span.set_attribute(KeyValue::new("source.node.id", source_node as i64));
span.set_attribute(KeyValue::new("target.node.id", target_node as i64));
span.set_attribute(KeyValue::new("operation.type", "replication"));
span
}
pub fn query_span(query_type: &str, node_id: u64) -> impl Span {
let tracer = global::tracer("oxirs-cluster");
let mut span = tracer
.span_builder(format!("query.{}", query_type))
.with_kind(SpanKind::Server)
.start(&tracer);
span.set_attribute(KeyValue::new("node.id", node_id as i64));
span.set_attribute(KeyValue::new("operation.type", "query"));
span
}
pub fn storage_span(operation: &str, node_id: u64) -> impl Span {
let tracer = global::tracer("oxirs-cluster");
let mut span = tracer
.span_builder(format!("storage.{}", operation))
.with_kind(SpanKind::Internal)
.start(&tracer);
span.set_attribute(KeyValue::new("node.id", node_id as i64));
span.set_attribute(KeyValue::new("operation.type", "storage"));
span
}
pub fn mark_success(span: &mut impl Span) {
span.set_status(Status::Ok);
}
pub fn mark_error(span: &mut impl Span, error: &str) {
span.set_status(Status::error(error.to_string()));
span.set_attribute(KeyValue::new("error", true));
span.set_attribute(KeyValue::new("error.message", error.to_string()));
}
}
#[derive(Debug, Clone)]
pub struct TracingContext {
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
pub trace_flags: u8,
}
impl TracingContext {
pub fn current() -> Option<Self> {
let context = Context::current();
let span = context.span();
let span_context = span.span_context();
if span_context.is_valid() {
Some(Self {
trace_id: span_context.trace_id().to_string(),
span_id: span_context.span_id().to_string(),
parent_span_id: None,
trace_flags: span_context.trace_flags().to_u8(),
})
} else {
None
}
}
pub fn to_headers(&self) -> Vec<(String, String)> {
vec![
("x-trace-id".to_string(), self.trace_id.clone()),
("x-span-id".to_string(), self.span_id.clone()),
("x-trace-flags".to_string(), self.trace_flags.to_string()),
]
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TracingStatistics {
pub total_spans_created: u64,
pub total_spans_exported: u64,
pub total_export_errors: u64,
pub current_sampling_rate: f64,
pub avg_span_duration_ms: f64,
pub is_active: bool,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tracing_config_default() {
let config = TracingConfig::default();
assert_eq!(config.service_name, "oxirs-cluster");
assert!(config.enabled);
assert_eq!(
config.sampling_strategy,
SamplingStrategy::TraceIdRatioBased(0.1)
);
}
#[test]
fn test_tracing_config_builder() {
let config = TracingConfig::default()
.with_service_name("test-service")
.with_otlp_endpoint("http://test:4317")
.with_sampling_strategy(SamplingStrategy::AlwaysOn)
.with_enabled(false);
assert_eq!(config.service_name, "test-service");
assert_eq!(config.otlp_endpoint, Some("http://test:4317".to_string()));
assert_eq!(config.sampling_strategy, SamplingStrategy::AlwaysOn);
assert!(!config.enabled);
}
#[test]
fn test_sampling_strategies() {
let always_on = SamplingStrategy::AlwaysOn;
let always_off = SamplingStrategy::AlwaysOff;
let ratio = SamplingStrategy::TraceIdRatioBased(0.5);
assert!(matches!(always_on, SamplingStrategy::AlwaysOn));
assert!(matches!(always_off, SamplingStrategy::AlwaysOff));
if let SamplingStrategy::TraceIdRatioBased(r) = ratio {
assert_eq!(r, 0.5);
}
}
#[tokio::test]
async fn test_tracing_manager_creation() {
let config = TracingConfig::default().with_enabled(false);
let manager = TracingManager::new(config).await;
assert!(manager.is_ok());
}
#[tokio::test]
async fn test_tracing_manager_disabled() {
let config = TracingConfig::default().with_enabled(false);
let manager = TracingManager::new(config).await.unwrap();
let result = manager.start().await;
assert!(result.is_ok());
assert!(!manager.is_initialized().await);
}
}