use crate::error::CoreError;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime};
use uuid::Uuid;
#[cfg(feature = "serialization")]
use serde::{Deserialize, Serialize};
const TRACE_VERSION: u8 = 0;
#[allow(dead_code)]
const TRACE_HEADER_NAME: &str = "traceparent";
#[allow(dead_code)]
const TRACE_STATE_HEADER_NAME: &str = "tracestate";
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct TracingConfig {
pub service_name: String,
pub service_version: String,
pub environment: String,
pub samplingrate: f64,
pub max_activespans: usize,
pub span_timeout: Duration,
pub enable_performance_attribution: bool,
pub enable_distributed_context: bool,
pub default_attributes: HashMap<String, String>,
pub export_endpoint: Option<String>,
pub export_batch_size: usize,
pub export_timeout: Duration,
}
impl Default for TracingConfig {
fn default() -> Self {
Self {
service_name: "scirs2-core".to_string(),
service_version: env!("CARGO_PKG_VERSION").to_string(),
environment: "production".to_string(),
samplingrate: 1.0,
max_activespans: 10000,
span_timeout: Duration::from_secs(300), enable_performance_attribution: true,
enable_distributed_context: true,
default_attributes: HashMap::new(),
export_endpoint: None,
export_batch_size: 100,
export_timeout: Duration::from_secs(30),
}
}
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SpanKind {
Internal,
Server,
Client,
Producer,
Consumer,
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SpanStatus {
Ok,
Error,
Cancelled,
Unknown,
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct TraceContext {
pub trace_id: Uuid,
pub spanid: Uuid,
pub parent_spanid: Option<Uuid>,
pub trace_flags: u8,
pub baggage: HashMap<String, String>,
pub tracestate: Option<String>,
pub is_remote: bool,
}
impl TraceContext {
#[must_use]
pub fn new() -> Self {
Self {
trace_id: Uuid::new_v4(),
spanid: Uuid::new_v4(),
parent_spanid: None,
trace_flags: 1, baggage: HashMap::new(),
tracestate: None,
is_remote: false,
}
}
#[must_use]
pub fn child(&self) -> Self {
Self {
trace_id: self.trace_id,
spanid: Uuid::new_v4(),
parent_spanid: Some(self.spanid),
trace_flags: self.trace_flags,
baggage: self.baggage.clone(),
tracestate: self.tracestate.clone(),
is_remote: false,
}
}
#[must_use]
pub fn remote_child(&self) -> Self {
let mut child = self.child();
child.is_remote = true;
child
}
#[must_use]
pub const fn is_sampled(&self) -> bool {
self.trace_flags & 1 != 0
}
#[must_use]
pub fn with_baggage(mut self, key: String, value: String) -> Self {
self.baggage.insert(key, value);
self
}
#[must_use]
pub fn with_tracestate(mut self, tracestate: String) -> Self {
self.tracestate = Some(tracestate);
self
}
#[must_use]
pub fn to_traceparent(&self) -> String {
format!(
"{:02x}-{}-{}-{:02x}",
TRACE_VERSION,
self.trace_id.as_simple(),
&self.spanid.as_simple().to_string()[16..], self.trace_flags
)
}
pub fn from_traceparent(header: &str) -> Result<Self, CoreError> {
let parts: Vec<&str> = header.split('-').collect();
if parts.len() != 4 {
return Err(CoreError::ComputationError(
crate::error::ErrorContext::new("Invalid traceparent format".to_string()),
));
}
let version = u8::from_str_radix(parts[0], 16).map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Invalid _version in traceparent".to_string(),
))
})?;
if version != TRACE_VERSION {
return Err(CoreError::ComputationError(
crate::error::ErrorContext::new("Unsupported traceparent _version".to_string()),
));
}
let trace_id = Uuid::parse_str(&format!(
"{}-{}-{}-{}-{}",
&parts[1][0..8],
&parts[1][8..12],
&parts[1][12..16],
&parts[1][16..20],
&parts[1][20..32]
))
.map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Invalid trace ID in traceparent".to_string(),
))
})?;
let spanid_str = if parts[2].len() == 16 {
format!("{:0>32}", parts[2]) } else {
return Err(CoreError::ComputationError(
crate::error::ErrorContext::new(
"Invalid span ID length in traceparent".to_string(),
),
));
};
let spanid = Uuid::parse_str(&format!(
"{}-{}-{}-{}-{}",
&spanid_str[0..8],
&spanid_str[8..12],
&spanid_str[12..16],
&spanid_str[16..20],
&spanid_str[20..32]
))
.map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Invalid span ID in traceparent".to_string(),
))
})?;
let trace_flags = u8::from_str_radix(parts[3], 16).map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Invalid flags in traceparent".to_string(),
))
})?;
Ok(Self {
trace_id,
spanid,
parent_spanid: None,
trace_flags,
baggage: HashMap::new(),
tracestate: None,
is_remote: true,
})
}
#[must_use]
pub fn to_baggage(&self) -> Option<String> {
if self.baggage.is_empty() {
None
} else {
Some(
self.baggage
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(", "),
)
}
}
#[must_use]
pub fn with_baggage_header(mut self, header: &str) -> Self {
for item in header.split(',') {
let item = item.trim();
if let Some(eq_pos) = item.find('=') {
let key = item[..eq_pos].trim().to_string();
let value = item[eq_pos + 1..].trim().to_string();
self.baggage.insert(key, value);
}
}
self
}
}
impl Default for TraceContext {
fn default() -> Self {
Self::new()
}
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct SpanMetrics {
pub duration: Duration,
pub cpu_time: Option<Duration>,
pub memory_allocated: Option<u64>,
pub memory_deallocated: Option<u64>,
pub peak_memory: Option<u64>,
pub child_span_count: usize,
pub custom_metrics: HashMap<String, f64>,
}
impl Default for SpanMetrics {
fn default() -> Self {
Self {
duration: Duration::from_nanos(0),
cpu_time: None,
memory_allocated: None,
memory_deallocated: None,
peak_memory: None,
child_span_count: 0,
custom_metrics: HashMap::new(),
}
}
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct Span {
pub context: TraceContext,
pub name: String,
pub kind: SpanKind,
pub start_time: SystemTime,
pub end_time: Option<SystemTime>,
pub status: SpanStatus,
pub attributes: HashMap<String, String>,
pub events: Vec<SpanEvent>,
pub metrics: SpanMetrics,
pub component: Option<String>,
pub error: Option<String>,
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct SpanEvent {
pub timestamp: SystemTime,
pub name: String,
pub attributes: HashMap<String, String>,
}
pub struct ActiveSpan {
span: Arc<Mutex<Span>>,
tracingsystem: Arc<TracingSystem>,
start_instant: Instant,
#[cfg(feature = "memory_metrics")]
initial_memory: Option<u64>,
}
impl ActiveSpan {
pub fn add_attribute(&self, key: &str, value: &str) -> Result<(), CoreError> {
let mut span = self.span.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire span lock".to_string(),
))
})?;
span.attributes.insert(key.to_string(), value.to_string());
Ok(())
}
pub fn add_event(
&self,
name: &str,
attributes: HashMap<String, String>,
) -> Result<(), CoreError> {
let mut span = self.span.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire span lock".to_string(),
))
})?;
let event = SpanEvent {
timestamp: SystemTime::now(),
name: name.to_string(),
attributes,
};
span.events.push(event);
Ok(())
}
pub fn add_metric(&self, name: &str, value: f64) -> Result<(), CoreError> {
let mut span = self.span.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire span lock".to_string(),
))
})?;
span.metrics.custom_metrics.insert(name.to_string(), value);
Ok(())
}
pub fn set_status(&self, status: SpanStatus) -> Result<(), CoreError> {
let mut span = self.span.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire span lock".to_string(),
))
})?;
span.status = status;
Ok(())
}
pub fn seterror(&self, error: &str) -> Result<(), CoreError> {
let mut span = self.span.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire span lock".to_string(),
))
})?;
span.status = SpanStatus::Error;
span.error = Some(error.to_string());
Ok(())
}
#[must_use]
pub fn in_span<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
CURRENT_SPAN.with(|current| {
let _prev = current.replace(Some(self.span.clone()));
let result = f();
current.replace(_prev);
result
})
}
#[cfg(feature = "async")]
pub async fn in_span_async<F, Fut, R>(&self, f: F) -> R
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = R>,
{
CURRENT_SPAN.with(|current| {
let _prev = current.borrow_mut().replace(self.span.clone());
});
f().await
}
pub fn context(&self) -> Result<TraceContext, CoreError> {
let span = self.span.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire span lock".to_string(),
))
})?;
Ok(span.context.clone())
}
pub fn end(self) {
drop(self);
}
}
impl Drop for ActiveSpan {
fn drop(&mut self) {
if let Ok(mut span) = self.span.lock() {
if span.end_time.is_none() {
span.end_time = Some(SystemTime::now());
span.metrics.duration = self.start_instant.elapsed();
#[cfg(feature = "memory_metrics")]
if let Some(initial_memory) = self.initial_memory {
if let Ok(current_memory) = get_current_memory_usage() {
if current_memory > initial_memory {
span.metrics.memory_allocated = Some(current_memory - initial_memory);
} else {
span.metrics.memory_deallocated = Some(initial_memory - current_memory);
}
}
}
if let Err(e) = self.tracingsystem.record_span(span.clone()) {
eprintln!("Failed to record span: {e}");
}
}
}
}
}
pub struct SpanBuilder {
name: String,
kind: SpanKind,
attributes: HashMap<String, String>,
parent_context: Option<TraceContext>,
component: Option<String>,
}
impl SpanBuilder {
#[must_use]
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
kind: SpanKind::Internal,
attributes: HashMap::new(),
parent_context: None,
component: None,
}
}
#[must_use]
pub fn with_kind(mut self, kind: SpanKind) -> Self {
self.kind = kind;
self
}
#[must_use]
pub fn with_attribute(mut self, key: &str, value: &str) -> Self {
self.attributes.insert(key.to_string(), value.to_string());
self
}
#[must_use]
pub fn with_parent(mut self, context: TraceContext) -> Self {
self.parent_context = Some(context);
self
}
#[must_use]
pub fn with_component(mut self, component: &str) -> Self {
self.component = Some(component.to_string());
self
}
pub fn start(self, tracingsystem: &TracingSystem) -> Result<ActiveSpan, CoreError> {
tracingsystem.start_span_with_builder(self)
}
}
thread_local! {
static CURRENT_SPAN: std::cell::RefCell<Option<Arc<Mutex<Span>>>> = const { std::cell::RefCell::new(None) };
}
#[derive(Debug)]
struct SpanStorage {
active_spans: RwLock<HashMap<Uuid, Arc<Mutex<Span>>>>,
completed_spans: Mutex<Vec<Span>>,
max_activespans: usize,
}
impl SpanStorage {
#[must_use]
fn new(max_activespans: usize) -> Self {
Self {
active_spans: RwLock::new(HashMap::new()),
completed_spans: Mutex::new(Vec::new()),
max_activespans,
}
}
fn add_active_span(&self, span: Arc<Mutex<Span>>) -> Result<(), CoreError> {
let mut active = self.active_spans.write().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire write lock".to_string(),
))
})?;
if active.len() >= self.max_activespans {
return Err(CoreError::ComputationError(
crate::error::ErrorContext::new("Maximum active spans exceeded".to_string()),
));
}
let spanid = {
let span_guard = span.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire span lock".to_string(),
))
})?;
span_guard.context.spanid
};
active.insert(spanid, span);
Ok(())
}
#[must_use]
fn remove_active_span(&self, spanid: Uuid) -> Option<Arc<Mutex<Span>>> {
if let Ok(mut active) = self.active_spans.write() {
active.remove(&spanid)
} else {
None
}
}
fn record_completed_span(&self, span: Span) -> Result<(), CoreError> {
let mut completed = self.completed_spans.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire completed spans lock".to_string(),
))
})?;
completed.push(span);
Ok(())
}
#[must_use]
fn get_active_span_count(&self) -> usize {
self.active_spans
.read()
.map(|spans| spans.len())
.unwrap_or(0)
}
fn cleanup_expired_spans(&self, timeout: Duration) -> Result<Vec<Span>, CoreError> {
let mut expired_spans = Vec::new();
let now = SystemTime::now();
let mut to_remove = Vec::new();
{
let active = self.active_spans.read().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire read lock".to_string(),
))
})?;
for (spanid, span_arc) in active.iter() {
if let Ok(span) = span_arc.lock() {
if let Ok(elapsed) = now.duration_since(span.start_time) {
if elapsed > timeout {
to_remove.push(*spanid);
}
}
}
}
}
for spanid in to_remove {
if let Some(span_arc) = self.remove_active_span(spanid) {
if let Ok(mut span) = span_arc.lock() {
span.status = SpanStatus::Cancelled;
span.end_time = Some(now);
expired_spans.push(span.clone());
}
}
}
Ok(expired_spans)
}
}
pub struct TracingSystem {
config: TracingConfig,
storage: SpanStorage,
sampler: Box<dyn TracingSampler + Send + Sync>,
exporter: Option<Box<dyn TraceExporter + Send + Sync>>,
metrics: Arc<Mutex<TracingMetrics>>,
}
impl TracingSystem {
pub fn new(config: TracingConfig) -> Result<Self, CoreError> {
let storage = SpanStorage::new(config.max_activespans);
let sampler = Box::new(ProbabilitySampler::new(config.samplingrate));
let metrics = Arc::new(Mutex::new(TracingMetrics::default()));
Ok(Self {
config,
storage,
sampler,
exporter: None,
metrics,
})
}
#[must_use]
pub fn with_exporter(mut self, exporter: Box<dyn TraceExporter + Send + Sync>) -> Self {
self.exporter = Some(exporter);
self
}
pub fn start_span(&self, name: &str) -> Result<ActiveSpan, CoreError> {
let builder = SpanBuilder::new(name);
self.start_span_with_builder(builder)
}
pub fn start_span_with_builder(&self, builder: SpanBuilder) -> Result<ActiveSpan, CoreError> {
let context = if let Some(parent) = builder.parent_context {
parent.child()
} else {
CURRENT_SPAN
.with(|current| {
if let Some(current_span) = current.borrow().as_ref() {
if let Ok(span) = current_span.lock() {
Some(span.context.child())
} else {
None
}
} else {
None
}
})
.unwrap_or_default()
};
if !self.sampler.should_sample(&context, &builder.name) {
let span = Span {
context: context.clone(),
name: builder.name,
kind: builder.kind,
start_time: SystemTime::now(),
end_time: None,
status: SpanStatus::Ok,
attributes: builder.attributes,
events: Vec::new(),
metrics: SpanMetrics::default(),
component: builder.component,
error: None,
};
let span_arc = Arc::new(Mutex::new(span));
return Ok(ActiveSpan {
span: span_arc,
tracingsystem: Arc::new(self.clone()),
start_instant: Instant::now(),
#[cfg(feature = "memory_metrics")]
initial_memory: get_current_memory_usage().ok(),
});
}
let mut attributes = self.config.default_attributes.clone();
attributes.extend(builder.attributes);
let span = Span {
context: context.clone(),
name: builder.name,
kind: builder.kind,
start_time: SystemTime::now(),
end_time: None,
status: SpanStatus::Ok,
attributes,
events: Vec::new(),
metrics: SpanMetrics::default(),
component: builder.component,
error: None,
};
let span_arc = Arc::new(Mutex::new(span));
self.storage.add_active_span(span_arc.clone())?;
if let Ok(mut metrics) = self.metrics.lock() {
metrics.spans_started += 1;
metrics.active_spans = self.storage.get_active_span_count();
}
Ok(ActiveSpan {
span: span_arc,
tracingsystem: Arc::new(self.clone()),
start_instant: Instant::now(),
#[cfg(feature = "memory_metrics")]
initial_memory: get_current_memory_usage().ok(),
})
}
#[must_use]
pub fn current_span(&self) -> Option<Arc<Mutex<Span>>> {
CURRENT_SPAN.with(|current| current.borrow().clone())
}
pub fn record_span(&self, span: Span) -> Result<(), CoreError> {
let _ = self.storage.remove_active_span(span.context.spanid);
if let Ok(mut metrics) = self.metrics.lock() {
metrics.spans_completed += 1;
metrics.active_spans = self.storage.get_active_span_count();
if span.status == SpanStatus::Error {
metrics.spans_failed += 1;
}
metrics.total_duration += span.metrics.duration;
}
if let Some(ref exporter) = self.exporter {
exporter.export_span(&span)?;
}
self.storage.record_completed_span(span)?;
Ok(())
}
pub fn cleanup_expired_spans(&self) -> Result<(), CoreError> {
let expired_spans = self
.storage
.cleanup_expired_spans(self.config.span_timeout)?;
for span in expired_spans {
self.record_span(span)?;
}
Ok(())
}
pub fn get_metrics(&self) -> Result<TracingMetrics, CoreError> {
let metrics = self.metrics.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire metrics lock".to_string(),
))
})?;
Ok(metrics.clone())
}
pub fn flush(&self) -> Result<(), CoreError> {
if let Some(ref exporter) = self.exporter {
exporter.flush()?;
}
Ok(())
}
}
impl Clone for TracingSystem {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
storage: SpanStorage::new(self.config.max_activespans),
sampler: Box::new(ProbabilitySampler::new(self.config.samplingrate)),
exporter: None, metrics: Arc::new(Mutex::new(TracingMetrics::default())),
}
}
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Default)]
pub struct TracingMetrics {
pub spans_started: u64,
pub spans_completed: u64,
pub spans_failed: u64,
pub active_spans: usize,
pub total_duration: Duration,
pub spans_exported: u64,
pub export_failures: u64,
}
pub trait TracingSampler {
fn should_sample(&self, context: &TraceContext, spanname: &str) -> bool;
}
pub struct ProbabilitySampler {
samplingrate: f64,
}
impl ProbabilitySampler {
pub fn new(samplingrate: f64) -> Self {
Self {
samplingrate: samplingrate.clamp(0.0, 1.0),
}
}
}
impl TracingSampler for ProbabilitySampler {
fn should_sample(&self, _context: &TraceContext, _spanname: &str) -> bool {
if self.samplingrate >= 1.0 {
true
} else if self.samplingrate <= 0.0 {
false
} else {
use rand::RngExt;
let mut rng = rand::rng();
rng.random::<f64>() < self.samplingrate
}
}
}
pub struct AdaptiveSampler {
base_rate: f64,
min_rate: f64,
max_rate: f64,
sample_count: AtomicU64,
total_count: AtomicU64,
adjustment_window: u64,
target_rate_persecond: f64,
last_adjustment: Mutex<Instant>,
}
impl AdaptiveSampler {
pub fn new(base_rate: f64, target_rate_persecond: f64) -> Self {
Self {
base_rate: base_rate.clamp(0.0, 1.0),
min_rate: 0.001, max_rate: 1.0, sample_count: AtomicU64::new(0),
total_count: AtomicU64::new(0),
adjustment_window: 1000, target_rate_persecond,
last_adjustment: Mutex::new(Instant::now()),
}
}
fn adjust_samplingrate(&self) -> f64 {
let total = self.total_count.load(Ordering::Relaxed);
if total % self.adjustment_window == 0 && total > 0 {
if let Ok(mut last) = self.last_adjustment.try_lock() {
let now = Instant::now();
let elapsed = now.duration_since(*last).as_secs_f64();
*last = now;
if elapsed > 0.0 {
let current_rate = total as f64 / elapsed;
let adjustment_factor = self.target_rate_persecond / current_rate;
let new_rate =
(self.base_rate * adjustment_factor).clamp(self.min_rate, self.max_rate);
return new_rate;
}
}
}
self.base_rate
}
pub fn get_stats(&self) -> (u64, u64, f64) {
let total = self.total_count.load(Ordering::Relaxed);
let sampled = self.sample_count.load(Ordering::Relaxed);
let rate = if total > 0 {
sampled as f64 / total as f64
} else {
0.0
};
(total, sampled, rate)
}
}
impl TracingSampler for AdaptiveSampler {
fn should_sample(&self, _context: &TraceContext, _spanname: &str) -> bool {
self.total_count.fetch_add(1, Ordering::Relaxed);
let current_rate = self.adjust_samplingrate();
if current_rate >= 1.0 {
self.sample_count.fetch_add(1, Ordering::Relaxed);
true
} else if current_rate <= 0.0 {
false
} else {
use rand::RngExt;
let mut rng = rand::rng();
if rng.random::<f64>() < current_rate {
self.sample_count.fetch_add(1, Ordering::Relaxed);
true
} else {
false
}
}
}
}
pub struct RateLimitingSampler {
max_samples_persecond: u64,
sample_count: AtomicU64,
window_start: Mutex<Instant>,
windowsize: Duration,
}
impl RateLimitingSampler {
pub fn new(max_samples_persecond: u64) -> Self {
Self {
max_samples_persecond,
sample_count: AtomicU64::new(0),
window_start: Mutex::new(Instant::now()),
windowsize: Duration::from_secs(1),
}
}
fn reset_window_if_needed(&self) -> bool {
if let Ok(mut start) = self.window_start.try_lock() {
let now = Instant::now();
if now.duration_since(*start) >= self.windowsize {
*start = now;
self.sample_count.store(0, Ordering::Relaxed);
return true;
}
}
false
}
}
impl TracingSampler for RateLimitingSampler {
fn should_sample(&self, _context: &TraceContext, _spanname: &str) -> bool {
self.reset_window_if_needed();
let current_count = self.sample_count.load(Ordering::Relaxed);
if current_count < self.max_samples_persecond {
self.sample_count.fetch_add(1, Ordering::Relaxed);
true
} else {
false
}
}
}
pub trait TraceExporter {
fn export_span(&self, span: &Span) -> Result<(), CoreError>;
fn export_spans(&self, spans: &[Span]) -> Result<(), CoreError> {
for span in spans {
self.export_span(span)?;
}
Ok(())
}
fn flush(&self) -> Result<(), CoreError>;
fn shutdown(&self) -> Result<(), CoreError>;
}
pub struct BatchExporter {
inner: Box<dyn TraceExporter + Send + Sync>,
batch_size: usize,
batch_timeout: Duration,
buffer: Mutex<Vec<Span>>,
last_export: Mutex<Instant>,
}
impl BatchExporter {
pub fn new(
inner: Box<dyn TraceExporter + Send + Sync>,
batch_size: usize,
batch_timeout: Duration,
) -> Self {
Self {
inner,
batch_size,
batch_timeout,
buffer: Mutex::new(Vec::new()),
last_export: Mutex::new(Instant::now()),
}
}
fn should_flush(&self) -> bool {
if let Ok(buffer) = self.buffer.try_lock() {
if buffer.len() >= self.batch_size {
return true;
}
}
if let Ok(last_export) = self.last_export.try_lock() {
if last_export.elapsed() >= self.batch_timeout {
return true;
}
}
false
}
fn flush_internal(&self) -> Result<(), CoreError> {
let spans_to_export = {
let mut buffer = self.buffer.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire buffer lock".to_string(),
))
})?;
if buffer.is_empty() {
return Ok(());
}
let spans = buffer.drain(..).collect::<Vec<_>>();
spans
};
if !spans_to_export.is_empty() {
self.inner.export_spans(&spans_to_export)?;
if let Ok(mut last_export) = self.last_export.lock() {
*last_export = Instant::now();
}
}
Ok(())
}
}
impl TraceExporter for BatchExporter {
fn export_span(&self, span: &Span) -> Result<(), CoreError> {
{
let mut buffer = self.buffer.lock().map_err(|_| {
CoreError::ComputationError(crate::error::ErrorContext::new(
"Failed to acquire buffer lock".to_string(),
))
})?;
buffer.push(span.clone());
}
if self.should_flush() {
self.flush_internal()?;
}
Ok(())
}
fn flush(&self) -> Result<(), CoreError> {
self.flush_internal()?;
self.inner.flush()
}
fn shutdown(&self) -> Result<(), CoreError> {
self.flush_internal()?;
self.inner.shutdown()
}
}
pub struct ConsoleExporter {
prettyprint: bool,
}
impl ConsoleExporter {
pub fn new(prettyprint: bool) -> Self {
Self { prettyprint }
}
}
impl TraceExporter for ConsoleExporter {
fn export_span(&self, span: &Span) -> Result<(), CoreError> {
if self.prettyprint {
println!("=== Span Export ===");
println!("Trace ID: {}", span.context.trace_id);
println!("Span ID: {}", span.context.spanid);
println!("Name: {}", span.name);
println!("Duration: {:?}", span.metrics.duration);
println!("Status: {:?}", span.status);
if !span.attributes.is_empty() {
println!("Attributes: {:?}", span.attributes);
}
if !span.events.is_empty() {
println!("Events: {} recorded", span.events.len());
}
println!("==================");
} else {
println!(
"SPAN: {} {} {:?} {:?}",
span.context.trace_id, span.name, span.metrics.duration, span.status
);
}
Ok(())
}
fn flush(&self) -> Result<(), CoreError> {
Ok(())
}
fn shutdown(&self) -> Result<(), CoreError> {
Ok(())
}
}
#[cfg(feature = "reqwest")]
pub struct HttpExporter {
endpoint: String,
client: reqwest::blocking::Client,
#[allow(dead_code)]
timeout: Duration,
}
#[cfg(feature = "reqwest")]
impl HttpExporter {
pub fn new(endpoint: String, timeout: Duration) -> Result<Self, CoreError> {
let client = reqwest::blocking::Client::builder()
.timeout(timeout)
.build()
.map_err(|e| {
CoreError::ComputationError(crate::error::ErrorContext::new(format!(
"Failed to create HTTP client: {}",
e
)))
})?;
Ok(Self {
endpoint,
client,
timeout,
})
}
}
#[cfg(feature = "reqwest")]
impl TraceExporter for HttpExporter {
fn export_span(&self, span: &Span) -> Result<(), CoreError> {
#[cfg(feature = "serialization")]
{
let json = serde_json::to_string(span).map_err(|e| {
CoreError::ComputationError(crate::error::ErrorContext::new(format!(
"Failed to serialize span: {}",
e
)))
})?;
let response = self
.client
.post(&self.endpoint)
.header("Content-Type", "application/json")
.body(json)
.send()
.map_err(|e| {
CoreError::ComputationError(crate::error::ErrorContext::new(format!(
"Failed to send span: {}",
e
)))
})?;
if !response.status().is_success() {
return Err(CoreError::ComputationError(
crate::error::ErrorContext::new(format!(
"Failed to export span: HTTP {}",
response.status()
)),
));
}
Ok(())
}
#[cfg(not(feature = "serialization"))]
{
Err(CoreError::ComputationError(
crate::error::ErrorContext::new(
"HTTP export requires serialization feature".to_string(),
),
))
}
}
fn flush(&self) -> Result<(), CoreError> {
Ok(())
}
fn shutdown(&self) -> Result<(), CoreError> {
Ok(())
}
}
#[cfg(feature = "memory_metrics")]
#[allow(dead_code)]
fn get_current_memory_usage() -> Result<u64, CoreError> {
Ok(0)
}
static GLOBAL_TRACER: std::sync::OnceLock<Arc<TracingSystem>> = std::sync::OnceLock::new();
#[allow(dead_code)]
pub fn init_tracing(config: TracingConfig) -> Result<(), CoreError> {
let tracer = TracingSystem::new(config)?;
match GLOBAL_TRACER.set(Arc::new(tracer)) {
Ok(()) => Ok(()),
Err(_) => {
Ok(())
}
}
}
#[allow(dead_code)]
pub fn global_tracer() -> Option<Arc<TracingSystem>> {
GLOBAL_TRACER.get().cloned()
}
#[macro_export]
macro_rules! trace_fn {
($name:expr, $block:block) => {{
if let Some(tracer) = $crate::observability::tracing::global_tracer() {
let span = tracer.start_span($name)?;
span.in_span(|| $block)
} else {
$block
}
}};
}
#[cfg(feature = "async")]
#[macro_export]
macro_rules! trace_async_fn {
($name:expr, $block:block) => {{
if let Some(tracer) = $crate::observability::tracing::global_tracer() {
let span = tracer.start_span($name)?;
span.in_span_async(|| async move $block).await
} else {
async move $block.await
}
}};
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct TracingVersion {
pub major: u32,
pub minor: u32,
pub patch: u32,
}
impl TracingVersion {
pub const CURRENT: TracingVersion = TracingVersion {
major: 1,
minor: 0,
patch: 0,
};
pub fn new(major: u32, minor: u32, patch: u32) -> Self {
Self {
major,
minor,
patch,
}
}
pub fn is_compatible(&self, other: &TracingVersion) -> bool {
self.major == other.major && self.minor <= other.minor
}
}
impl std::fmt::Display for TracingVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct NegotiationResult {
pub agreed_version: TracingVersion,
pub features_supported: Vec<String>,
pub features_disabled: Vec<String>,
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Default)]
pub struct ResourceAttribution {
pub cpu_timens: Option<u64>,
pub memory_allocated_bytes: Option<u64>,
pub memory_deallocated_bytes: Option<u64>,
pub peak_memory_bytes: Option<u64>,
pub io_operations: Option<u64>,
pub bytes_read: Option<u64>,
pub byteswritten: Option<u64>,
pub network_requests: Option<u64>,
pub gpu_memory_bytes: Option<u64>,
pub gpu_compute_timens: Option<u64>,
}
impl ResourceAttribution {
pub fn new() -> Self {
Self::default()
}
pub fn with_cpu_time(mut self, cpu_timens: u64) -> Self {
self.cpu_timens = Some(cpu_timens);
self
}
pub fn with_memory_allocation(mut self, bytes: u64) -> Self {
self.memory_allocated_bytes = Some(bytes);
self
}
pub fn with_io_stats(mut self, operations: u64, bytes_read: u64, byteswritten: u64) -> Self {
self.io_operations = Some(operations);
self.bytes_read = Some(bytes_read);
self.byteswritten = Some(byteswritten);
self
}
pub fn with_gpu_stats(mut self, memory_bytes: u64, compute_timens: u64) -> Self {
self.gpu_memory_bytes = Some(memory_bytes);
self.gpu_compute_timens = Some(compute_timens);
self
}
pub fn merge(&mut self, other: &ResourceAttribution) {
if let Some(cpu) = other.cpu_timens {
self.cpu_timens = Some(self.cpu_timens.unwrap_or(0) + cpu);
}
if let Some(mem) = other.memory_allocated_bytes {
self.memory_allocated_bytes = Some(self.memory_allocated_bytes.unwrap_or(0) + mem);
}
if let Some(mem) = other.memory_deallocated_bytes {
self.memory_deallocated_bytes = Some(self.memory_deallocated_bytes.unwrap_or(0) + mem);
}
if let Some(peak) = other.peak_memory_bytes {
self.peak_memory_bytes = Some(self.peak_memory_bytes.unwrap_or(0).max(peak));
}
if let Some(io) = other.io_operations {
self.io_operations = Some(self.io_operations.unwrap_or(0) + io);
}
if let Some(read) = other.bytes_read {
self.bytes_read = Some(self.bytes_read.unwrap_or(0) + read);
}
if let Some(written) = other.byteswritten {
self.byteswritten = Some(self.byteswritten.unwrap_or(0) + written);
}
if let Some(net) = other.network_requests {
self.network_requests = Some(self.network_requests.unwrap_or(0) + net);
}
if let Some(gpu_mem) = other.gpu_memory_bytes {
self.gpu_memory_bytes = Some(self.gpu_memory_bytes.unwrap_or(0) + gpu_mem);
}
if let Some(gpu_time) = other.gpu_compute_timens {
self.gpu_compute_timens = Some(self.gpu_compute_timens.unwrap_or(0) + gpu_time);
}
}
}
#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Default)]
pub struct EnhancedSpanMetrics {
pub basic: SpanMetrics,
pub resources: ResourceAttribution,
pub performance_counters: HashMap<String, u64>,
}
impl EnhancedSpanMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn add_performance_counter(&mut self, name: &str, value: u64) {
*self
.performance_counters
.entry(name.to_string())
.or_insert(0) += value;
}
pub fn get_total_resource_cost(&self) -> f64 {
let mut cost = 0.0;
if let Some(cpu_ns) = self.resources.cpu_timens {
cost += cpu_ns as f64 / 1_000_000.0; }
if let Some(mem) = self.resources.memory_allocated_bytes {
cost += mem as f64 / 1_048_576.0; }
if let Some(io) = self.resources.io_operations {
cost += io as f64;
}
cost
}
}
#[cfg(feature = "observability")]
#[allow(dead_code)]
pub fn integrate_with_metrics_system() -> Result<(), CoreError> {
let registry = crate::metrics::global_metrics_registry();
use crate::metrics::{Counter, Gauge, Histogram};
registry.register(
"tracing_spans_started".to_string(),
Counter::new("tracing_spans_started".to_string()),
)?;
registry.register(
"tracing_spans_completed".to_string(),
Counter::new("tracing_spans_completed".to_string()),
)?;
registry.register(
"tracing_spans_failed".to_string(),
Counter::new("tracing_spans_failed".to_string()),
)?;
registry.register(
"tracing_active_spans".to_string(),
Gauge::new("tracing_active_spans".to_string()),
)?;
registry.register(
"tracing_span_duration".to_string(),
Histogram::with_buckets(
"tracing_span_duration".to_string(),
vec![0.001, 0.01, 0.1, 1.0, 10.0],
),
)?;
Ok(())
}
#[allow(dead_code)]
pub fn examplematrix_computation_with_tracing() -> Result<(), CoreError> {
let config = TracingConfig {
service_name: "matrix_computation_service".to_string(),
samplingrate: 1.0, enable_performance_attribution: true,
enable_distributed_context: true,
..TracingConfig::default()
};
let tracing = TracingSystem::new(config)?;
let _adaptive_sampler = AdaptiveSampler::new(0.1, 1000.0); let batch_exporter = BatchExporter::new(
Box::new(ConsoleExporter::new(true)),
50, Duration::from_secs(5), );
let tracing = tracing.with_exporter(Box::new(batch_exporter));
let computation_span = tracing.start_span("matrix_multiplication")?;
computation_span.add_attribute("matrix_size", "1000x1000")?;
computation_span.add_attribute("algorithm", "block_multiplication")?;
let _result = computation_span.in_span(|| {
let alloc_span = tracing.start_span("memory_allocation")?;
alloc_span.add_attribute("allocation_size", "8MB")?;
let _memory_result = alloc_span.in_span(|| {
std::thread::sleep(Duration::from_millis(10));
"allocated"
});
let compute_span = tracing.start_span("matrix_compute")?;
compute_span.add_metric("flops", 2_000_000_000.0)?;
let _compute_result = compute_span.in_span(|| {
std::thread::sleep(Duration::from_millis(100));
"computed"
});
Ok::<_, CoreError>("matrix_result")
})?;
computation_span.add_attribute("result_status", "success")?;
computation_span.end();
tracing.flush()?;
Ok(())
}
#[cfg(test)]
#[path = "tracing_tests.rs"]
mod tests;