1use crate::error::CoreError;
46use std::collections::HashMap;
47use std::sync::atomic::{AtomicU64, Ordering};
48use std::sync::{Arc, Mutex, RwLock};
49use std::time::{Duration, Instant, SystemTime};
50use uuid::Uuid;
51
52#[cfg(feature = "serialization")]
53use serde::{Deserialize, Serialize};
54
55const TRACE_VERSION: u8 = 0;
57#[allow(dead_code)]
58const TRACE_HEADER_NAME: &str = "traceparent";
59#[allow(dead_code)]
60const TRACE_STATE_HEADER_NAME: &str = "tracestate";
61
62#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
64#[derive(Debug, Clone)]
65pub struct TracingConfig {
66 pub service_name: String,
68 pub service_version: String,
70 pub environment: String,
72 pub samplingrate: f64,
74 pub max_activespans: usize,
76 pub span_timeout: Duration,
78 pub enable_performance_attribution: bool,
80 pub enable_distributed_context: bool,
82 pub default_attributes: HashMap<String, String>,
84 pub export_endpoint: Option<String>,
86 pub export_batch_size: usize,
88 pub export_timeout: Duration,
90}
91
92impl Default for TracingConfig {
93 fn default() -> Self {
94 Self {
95 service_name: "scirs2-core".to_string(),
96 service_version: env!("CARGO_PKG_VERSION").to_string(),
97 environment: "production".to_string(),
98 samplingrate: 1.0,
99 max_activespans: 10000,
100 span_timeout: Duration::from_secs(300), enable_performance_attribution: true,
102 enable_distributed_context: true,
103 default_attributes: HashMap::new(),
104 export_endpoint: None,
105 export_batch_size: 100,
106 export_timeout: Duration::from_secs(30),
107 }
108 }
109}
110
111#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum SpanKind {
115 Internal,
117 Server,
119 Client,
121 Producer,
123 Consumer,
125}
126
127#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum SpanStatus {
131 Ok,
133 Error,
135 Cancelled,
137 Unknown,
139}
140
141#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
143#[derive(Debug, Clone)]
144pub struct TraceContext {
145 pub trace_id: Uuid,
147 pub spanid: Uuid,
149 pub parent_spanid: Option<Uuid>,
151 pub trace_flags: u8,
153 pub baggage: HashMap<String, String>,
155 pub tracestate: Option<String>,
157 pub is_remote: bool,
159}
160
161impl TraceContext {
162 #[must_use]
164 pub fn new() -> Self {
165 Self {
166 trace_id: Uuid::new_v4(),
167 spanid: Uuid::new_v4(),
168 parent_spanid: None,
169 trace_flags: 1, baggage: HashMap::new(),
171 tracestate: None,
172 is_remote: false,
173 }
174 }
175
176 #[must_use]
178 pub fn child(&self) -> Self {
179 Self {
180 trace_id: self.trace_id,
181 spanid: Uuid::new_v4(),
182 parent_spanid: Some(self.spanid),
183 trace_flags: self.trace_flags,
184 baggage: self.baggage.clone(),
185 tracestate: self.tracestate.clone(),
186 is_remote: false,
187 }
188 }
189
190 #[must_use]
192 pub fn remote_child(&self) -> Self {
193 let mut child = self.child();
194 child.is_remote = true;
195 child
196 }
197
198 #[must_use]
200 pub const fn is_sampled(&self) -> bool {
201 self.trace_flags & 1 != 0
202 }
203
204 #[must_use]
206 pub fn with_baggage(mut self, key: String, value: String) -> Self {
207 self.baggage.insert(key, value);
208 self
209 }
210
211 #[must_use]
213 pub fn with_tracestate(mut self, tracestate: String) -> Self {
214 self.tracestate = Some(tracestate);
215 self
216 }
217
218 #[must_use]
220 pub fn to_traceparent(&self) -> String {
221 format!(
222 "{:02x}-{}-{}-{:02x}",
223 TRACE_VERSION,
224 self.trace_id.as_simple(),
225 &self.spanid.as_simple().to_string()[16..], self.trace_flags
227 )
228 }
229
230 pub fn from_traceparent(header: &str) -> Result<Self, CoreError> {
236 let parts: Vec<&str> = header.split('-').collect();
237 if parts.len() != 4 {
238 return Err(CoreError::ComputationError(
239 crate::error::ErrorContext::new("Invalid traceparent format".to_string()),
240 ));
241 }
242
243 let version = u8::from_str_radix(parts[0], 16).map_err(|_| {
244 CoreError::ComputationError(crate::error::ErrorContext::new(
245 "Invalid _version in traceparent".to_string(),
246 ))
247 })?;
248
249 if version != TRACE_VERSION {
250 return Err(CoreError::ComputationError(
251 crate::error::ErrorContext::new("Unsupported traceparent _version".to_string()),
252 ));
253 }
254
255 let trace_id = Uuid::parse_str(&format!(
256 "{}-{}-{}-{}-{}",
257 &parts[1][0..8],
258 &parts[1][8..12],
259 &parts[1][12..16],
260 &parts[1][16..20],
261 &parts[1][20..32]
262 ))
263 .map_err(|_| {
264 CoreError::ComputationError(crate::error::ErrorContext::new(
265 "Invalid trace ID in traceparent".to_string(),
266 ))
267 })?;
268
269 let spanid_str = if parts[2].len() == 16 {
271 format!("{:0>32}", parts[2]) } else {
273 return Err(CoreError::ComputationError(
274 crate::error::ErrorContext::new(
275 "Invalid span ID length in traceparent".to_string(),
276 ),
277 ));
278 };
279 let spanid = Uuid::parse_str(&format!(
280 "{}-{}-{}-{}-{}",
281 &spanid_str[0..8],
282 &spanid_str[8..12],
283 &spanid_str[12..16],
284 &spanid_str[16..20],
285 &spanid_str[20..32]
286 ))
287 .map_err(|_| {
288 CoreError::ComputationError(crate::error::ErrorContext::new(
289 "Invalid span ID in traceparent".to_string(),
290 ))
291 })?;
292
293 let trace_flags = u8::from_str_radix(parts[3], 16).map_err(|_| {
294 CoreError::ComputationError(crate::error::ErrorContext::new(
295 "Invalid flags in traceparent".to_string(),
296 ))
297 })?;
298
299 Ok(Self {
300 trace_id,
301 spanid,
302 parent_spanid: None,
303 trace_flags,
304 baggage: HashMap::new(),
305 tracestate: None,
306 is_remote: true,
307 })
308 }
309
310 #[must_use]
312 pub fn to_baggage(&self) -> Option<String> {
313 if self.baggage.is_empty() {
314 None
315 } else {
316 Some(
317 self.baggage
318 .iter()
319 .map(|(k, v)| format!("{k}={v}"))
320 .collect::<Vec<_>>()
321 .join(", "),
322 )
323 }
324 }
325
326 #[must_use]
328 pub fn with_baggage_header(mut self, header: &str) -> Self {
329 for item in header.split(',') {
330 let item = item.trim();
331 if let Some(eq_pos) = item.find('=') {
332 let key = item[..eq_pos].trim().to_string();
333 let value = item[eq_pos + 1..].trim().to_string();
334 self.baggage.insert(key, value);
335 }
336 }
337 self
338 }
339}
340
341impl Default for TraceContext {
342 fn default() -> Self {
343 Self::new()
344 }
345}
346
347#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
349#[derive(Debug, Clone)]
350pub struct SpanMetrics {
351 pub duration: Duration,
353 pub cpu_time: Option<Duration>,
355 pub memory_allocated: Option<u64>,
357 pub memory_deallocated: Option<u64>,
359 pub peak_memory: Option<u64>,
361 pub child_span_count: usize,
363 pub custom_metrics: HashMap<String, f64>,
365}
366
367impl Default for SpanMetrics {
368 fn default() -> Self {
369 Self {
370 duration: Duration::from_nanos(0),
371 cpu_time: None,
372 memory_allocated: None,
373 memory_deallocated: None,
374 peak_memory: None,
375 child_span_count: 0,
376 custom_metrics: HashMap::new(),
377 }
378 }
379}
380
381#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
383#[derive(Debug, Clone)]
384pub struct Span {
385 pub context: TraceContext,
387 pub name: String,
389 pub kind: SpanKind,
391 pub start_time: SystemTime,
393 pub end_time: Option<SystemTime>,
395 pub status: SpanStatus,
397 pub attributes: HashMap<String, String>,
399 pub events: Vec<SpanEvent>,
401 pub metrics: SpanMetrics,
403 pub component: Option<String>,
405 pub error: Option<String>,
407}
408
409#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
411#[derive(Debug, Clone)]
412pub struct SpanEvent {
413 pub timestamp: SystemTime,
415 pub name: String,
417 pub attributes: HashMap<String, String>,
419}
420
421pub struct ActiveSpan {
423 span: Arc<Mutex<Span>>,
424 tracingsystem: Arc<TracingSystem>,
425 start_instant: Instant,
426 #[cfg(feature = "memory_metrics")]
427 initial_memory: Option<u64>,
428}
429
430impl ActiveSpan {
431 pub fn add_attribute(&self, key: &str, value: &str) -> Result<(), CoreError> {
437 let mut span = self.span.lock().map_err(|_| {
438 CoreError::ComputationError(crate::error::ErrorContext::new(
439 "Failed to acquire span lock".to_string(),
440 ))
441 })?;
442 span.attributes.insert(key.to_string(), value.to_string());
443 Ok(())
444 }
445
446 pub fn add_event(
452 &self,
453 name: &str,
454 attributes: HashMap<String, String>,
455 ) -> Result<(), CoreError> {
456 let mut span = self.span.lock().map_err(|_| {
457 CoreError::ComputationError(crate::error::ErrorContext::new(
458 "Failed to acquire span lock".to_string(),
459 ))
460 })?;
461
462 let event = SpanEvent {
463 timestamp: SystemTime::now(),
464 name: name.to_string(),
465 attributes,
466 };
467
468 span.events.push(event);
469 Ok(())
470 }
471
472 pub fn add_metric(&self, name: &str, value: f64) -> Result<(), CoreError> {
478 let mut span = self.span.lock().map_err(|_| {
479 CoreError::ComputationError(crate::error::ErrorContext::new(
480 "Failed to acquire span lock".to_string(),
481 ))
482 })?;
483 span.metrics.custom_metrics.insert(name.to_string(), value);
484 Ok(())
485 }
486
487 pub fn set_status(&self, status: SpanStatus) -> Result<(), CoreError> {
493 let mut span = self.span.lock().map_err(|_| {
494 CoreError::ComputationError(crate::error::ErrorContext::new(
495 "Failed to acquire span lock".to_string(),
496 ))
497 })?;
498 span.status = status;
499 Ok(())
500 }
501
502 pub fn seterror(&self, error: &str) -> Result<(), CoreError> {
508 let mut span = self.span.lock().map_err(|_| {
509 CoreError::ComputationError(crate::error::ErrorContext::new(
510 "Failed to acquire span lock".to_string(),
511 ))
512 })?;
513 span.status = SpanStatus::Error;
514 span.error = Some(error.to_string());
515 Ok(())
516 }
517
518 #[must_use]
520 pub fn in_span<F, R>(&self, f: F) -> R
521 where
522 F: FnOnce() -> R,
523 {
524 CURRENT_SPAN.with(|current| {
526 let _prev = current.replace(Some(self.span.clone()));
527 let result = f();
528 current.replace(_prev);
529 result
530 })
531 }
532
533 #[cfg(feature = "async")]
535 pub async fn in_span_async<F, Fut, R>(&self, f: F) -> R
536 where
537 F: FnOnce() -> Fut,
538 Fut: std::future::Future<Output = R>,
539 {
540 CURRENT_SPAN.with(|current| {
543 let _prev = current.borrow_mut().replace(self.span.clone());
544 });
547 f().await
548 }
549
550 pub fn context(&self) -> Result<TraceContext, CoreError> {
556 let span = self.span.lock().map_err(|_| {
557 CoreError::ComputationError(crate::error::ErrorContext::new(
558 "Failed to acquire span lock".to_string(),
559 ))
560 })?;
561 Ok(span.context.clone())
562 }
563
564 pub fn end(self) {
566 drop(self);
568 }
569}
570
571impl Drop for ActiveSpan {
572 fn drop(&mut self) {
573 if let Ok(mut span) = self.span.lock() {
575 if span.end_time.is_none() {
576 span.end_time = Some(SystemTime::now());
577 span.metrics.duration = self.start_instant.elapsed();
578
579 #[cfg(feature = "memory_metrics")]
580 if let Some(initial_memory) = self.initial_memory {
581 if let Ok(current_memory) = get_current_memory_usage() {
583 if current_memory > initial_memory {
584 span.metrics.memory_allocated = Some(current_memory - initial_memory);
585 } else {
586 span.metrics.memory_deallocated = Some(initial_memory - current_memory);
587 }
588 }
589 }
590
591 if let Err(e) = self.tracingsystem.record_span(span.clone()) {
593 eprintln!("Failed to record span: {e}");
594 }
595 }
596 }
597 }
598}
599
600pub struct SpanBuilder {
602 name: String,
603 kind: SpanKind,
604 attributes: HashMap<String, String>,
605 parent_context: Option<TraceContext>,
606 component: Option<String>,
607}
608
609impl SpanBuilder {
610 #[must_use]
612 pub fn new(name: &str) -> Self {
613 Self {
614 name: name.to_string(),
615 kind: SpanKind::Internal,
616 attributes: HashMap::new(),
617 parent_context: None,
618 component: None,
619 }
620 }
621
622 #[must_use]
624 pub fn with_kind(mut self, kind: SpanKind) -> Self {
625 self.kind = kind;
626 self
627 }
628
629 #[must_use]
631 pub fn with_attribute(mut self, key: &str, value: &str) -> Self {
632 self.attributes.insert(key.to_string(), value.to_string());
633 self
634 }
635
636 #[must_use]
638 pub fn with_parent(mut self, context: TraceContext) -> Self {
639 self.parent_context = Some(context);
640 self
641 }
642
643 #[must_use]
645 pub fn with_component(mut self, component: &str) -> Self {
646 self.component = Some(component.to_string());
647 self
648 }
649
650 pub fn start(self, tracingsystem: &TracingSystem) -> Result<ActiveSpan, CoreError> {
656 tracingsystem.start_span_with_builder(self)
657 }
658}
659
660thread_local! {
662 static CURRENT_SPAN: std::cell::RefCell<Option<Arc<Mutex<Span>>>> = const { std::cell::RefCell::new(None) };
663}
664
665#[derive(Debug)]
667struct SpanStorage {
668 active_spans: RwLock<HashMap<Uuid, Arc<Mutex<Span>>>>,
669 completed_spans: Mutex<Vec<Span>>,
670 max_activespans: usize,
671}
672
673impl SpanStorage {
674 #[must_use]
675 fn new(max_activespans: usize) -> Self {
676 Self {
677 active_spans: RwLock::new(HashMap::new()),
678 completed_spans: Mutex::new(Vec::new()),
679 max_activespans,
680 }
681 }
682
683 fn add_active_span(&self, span: Arc<Mutex<Span>>) -> Result<(), CoreError> {
689 let mut active = self.active_spans.write().map_err(|_| {
690 CoreError::ComputationError(crate::error::ErrorContext::new(
691 "Failed to acquire write lock".to_string(),
692 ))
693 })?;
694
695 if active.len() >= self.max_activespans {
696 return Err(CoreError::ComputationError(
697 crate::error::ErrorContext::new("Maximum active spans exceeded".to_string()),
698 ));
699 }
700
701 let spanid = {
702 let span_guard = span.lock().map_err(|_| {
703 CoreError::ComputationError(crate::error::ErrorContext::new(
704 "Failed to acquire span lock".to_string(),
705 ))
706 })?;
707 span_guard.context.spanid
708 };
709
710 active.insert(spanid, span);
711 Ok(())
712 }
713
714 #[must_use]
715 fn remove_active_span(&self, spanid: Uuid) -> Option<Arc<Mutex<Span>>> {
716 if let Ok(mut active) = self.active_spans.write() {
717 active.remove(&spanid)
718 } else {
719 None
720 }
721 }
722
723 fn record_completed_span(&self, span: Span) -> Result<(), CoreError> {
729 let mut completed = self.completed_spans.lock().map_err(|_| {
730 CoreError::ComputationError(crate::error::ErrorContext::new(
731 "Failed to acquire completed spans lock".to_string(),
732 ))
733 })?;
734 completed.push(span);
735 Ok(())
736 }
737
738 #[must_use]
739 fn get_active_span_count(&self) -> usize {
740 self.active_spans
741 .read()
742 .map(|spans| spans.len())
743 .unwrap_or(0)
744 }
745
746 fn cleanup_expired_spans(&self, timeout: Duration) -> Result<Vec<Span>, CoreError> {
752 let mut expired_spans = Vec::new();
753 let now = SystemTime::now();
754 let mut to_remove = Vec::new();
755
756 {
757 let active = self.active_spans.read().map_err(|_| {
758 CoreError::ComputationError(crate::error::ErrorContext::new(
759 "Failed to acquire read lock".to_string(),
760 ))
761 })?;
762
763 for (spanid, span_arc) in active.iter() {
764 if let Ok(span) = span_arc.lock() {
765 if let Ok(elapsed) = now.duration_since(span.start_time) {
766 if elapsed > timeout {
767 to_remove.push(*spanid);
768 }
769 }
770 }
771 }
772 }
773
774 for spanid in to_remove {
775 if let Some(span_arc) = self.remove_active_span(spanid) {
776 if let Ok(mut span) = span_arc.lock() {
777 span.status = SpanStatus::Cancelled;
778 span.end_time = Some(now);
779 expired_spans.push(span.clone());
780 }
781 }
782 }
783
784 Ok(expired_spans)
785 }
786}
787
788pub struct TracingSystem {
790 config: TracingConfig,
791 storage: SpanStorage,
792 sampler: Box<dyn TracingSampler + Send + Sync>,
793 exporter: Option<Box<dyn TraceExporter + Send + Sync>>,
794 metrics: Arc<Mutex<TracingMetrics>>,
795}
796
797impl TracingSystem {
798 pub fn new(config: TracingConfig) -> Result<Self, CoreError> {
804 let storage = SpanStorage::new(config.max_activespans);
805 let sampler = Box::new(ProbabilitySampler::new(config.samplingrate));
806 let metrics = Arc::new(Mutex::new(TracingMetrics::default()));
807
808 Ok(Self {
809 config,
810 storage,
811 sampler,
812 exporter: None,
813 metrics,
814 })
815 }
816
817 #[must_use]
819 pub fn with_exporter(mut self, exporter: Box<dyn TraceExporter + Send + Sync>) -> Self {
820 self.exporter = Some(exporter);
821 self
822 }
823
824 pub fn start_span(&self, name: &str) -> Result<ActiveSpan, CoreError> {
830 let builder = SpanBuilder::new(name);
831 self.start_span_with_builder(builder)
832 }
833
834 pub fn start_span_with_builder(&self, builder: SpanBuilder) -> Result<ActiveSpan, CoreError> {
840 let context = if let Some(parent) = builder.parent_context {
842 parent.child()
843 } else {
844 CURRENT_SPAN
846 .with(|current| {
847 if let Some(current_span) = current.borrow().as_ref() {
848 if let Ok(span) = current_span.lock() {
849 Some(span.context.child())
850 } else {
851 None
852 }
853 } else {
854 None
855 }
856 })
857 .unwrap_or_default()
858 };
859
860 if !self.sampler.should_sample(&context, &builder.name) {
862 let span = Span {
864 context: context.clone(),
865 name: builder.name,
866 kind: builder.kind,
867 start_time: SystemTime::now(),
868 end_time: None,
869 status: SpanStatus::Ok,
870 attributes: builder.attributes,
871 events: Vec::new(),
872 metrics: SpanMetrics::default(),
873 component: builder.component,
874 error: None,
875 };
876
877 let span_arc = Arc::new(Mutex::new(span));
878 return Ok(ActiveSpan {
879 span: span_arc,
880 tracingsystem: Arc::new(self.clone()),
881 start_instant: Instant::now(),
882 #[cfg(feature = "memory_metrics")]
883 initial_memory: get_current_memory_usage().ok(),
884 });
885 }
886
887 let mut attributes = self.config.default_attributes.clone();
889 attributes.extend(builder.attributes);
890
891 let span = Span {
892 context: context.clone(),
893 name: builder.name,
894 kind: builder.kind,
895 start_time: SystemTime::now(),
896 end_time: None,
897 status: SpanStatus::Ok,
898 attributes,
899 events: Vec::new(),
900 metrics: SpanMetrics::default(),
901 component: builder.component,
902 error: None,
903 };
904
905 let span_arc = Arc::new(Mutex::new(span));
906
907 self.storage.add_active_span(span_arc.clone())?;
909
910 if let Ok(mut metrics) = self.metrics.lock() {
912 metrics.spans_started += 1;
913 metrics.active_spans = self.storage.get_active_span_count();
914 }
915
916 Ok(ActiveSpan {
917 span: span_arc,
918 tracingsystem: Arc::new(self.clone()),
919 start_instant: Instant::now(),
920 #[cfg(feature = "memory_metrics")]
921 initial_memory: get_current_memory_usage().ok(),
922 })
923 }
924
925 #[must_use]
927 pub fn current_span(&self) -> Option<Arc<Mutex<Span>>> {
928 CURRENT_SPAN.with(|current| current.borrow().clone())
929 }
930
931 pub fn record_span(&self, span: Span) -> Result<(), CoreError> {
937 let _ = self.storage.remove_active_span(span.context.spanid);
939
940 if let Ok(mut metrics) = self.metrics.lock() {
942 metrics.spans_completed += 1;
943 metrics.active_spans = self.storage.get_active_span_count();
944
945 if span.status == SpanStatus::Error {
946 metrics.spans_failed += 1;
947 }
948
949 metrics.total_duration += span.metrics.duration;
950 }
951
952 if let Some(ref exporter) = self.exporter {
954 exporter.export_span(&span)?;
955 }
956
957 self.storage.record_completed_span(span)?;
959
960 Ok(())
961 }
962
963 pub fn cleanup_expired_spans(&self) -> Result<(), CoreError> {
969 let expired_spans = self
970 .storage
971 .cleanup_expired_spans(self.config.span_timeout)?;
972
973 for span in expired_spans {
974 self.record_span(span)?;
975 }
976
977 Ok(())
978 }
979
980 pub fn get_metrics(&self) -> Result<TracingMetrics, CoreError> {
986 let metrics = self.metrics.lock().map_err(|_| {
987 CoreError::ComputationError(crate::error::ErrorContext::new(
988 "Failed to acquire metrics lock".to_string(),
989 ))
990 })?;
991 Ok(metrics.clone())
992 }
993
994 pub fn flush(&self) -> Result<(), CoreError> {
1000 if let Some(ref exporter) = self.exporter {
1001 exporter.flush()?;
1002 }
1003 Ok(())
1004 }
1005}
1006
1007impl Clone for TracingSystem {
1010 fn clone(&self) -> Self {
1011 Self {
1012 config: self.config.clone(),
1013 storage: SpanStorage::new(self.config.max_activespans),
1014 sampler: Box::new(ProbabilitySampler::new(self.config.samplingrate)),
1015 exporter: None, metrics: Arc::new(Mutex::new(TracingMetrics::default())),
1017 }
1018 }
1019}
1020
1021#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
1023#[derive(Debug, Clone, Default)]
1024pub struct TracingMetrics {
1025 pub spans_started: u64,
1027 pub spans_completed: u64,
1029 pub spans_failed: u64,
1031 pub active_spans: usize,
1033 pub total_duration: Duration,
1035 pub spans_exported: u64,
1037 pub export_failures: u64,
1039}
1040
1041pub trait TracingSampler {
1043 fn should_sample(&self, context: &TraceContext, spanname: &str) -> bool;
1045}
1046
1047pub struct ProbabilitySampler {
1049 samplingrate: f64,
1050}
1051
1052impl ProbabilitySampler {
1053 pub fn new(samplingrate: f64) -> Self {
1054 Self {
1055 samplingrate: samplingrate.clamp(0.0, 1.0),
1056 }
1057 }
1058}
1059
1060impl TracingSampler for ProbabilitySampler {
1061 fn should_sample(&self, _context: &TraceContext, _spanname: &str) -> bool {
1062 if self.samplingrate >= 1.0 {
1063 true
1064 } else if self.samplingrate <= 0.0 {
1065 false
1066 } else {
1067 use rand::Rng;
1068 let mut rng = rand::rng();
1069 rng.random::<f64>() < self.samplingrate
1070 }
1071 }
1072}
1073
1074pub struct AdaptiveSampler {
1076 base_rate: f64,
1077 min_rate: f64,
1078 max_rate: f64,
1079 sample_count: AtomicU64,
1080 total_count: AtomicU64,
1081 adjustment_window: u64,
1082 target_rate_persecond: f64,
1083 last_adjustment: Mutex<Instant>,
1084}
1085
1086impl AdaptiveSampler {
1087 pub fn new(base_rate: f64, target_rate_persecond: f64) -> Self {
1088 Self {
1089 base_rate: base_rate.clamp(0.0, 1.0),
1090 min_rate: 0.001, max_rate: 1.0, sample_count: AtomicU64::new(0),
1093 total_count: AtomicU64::new(0),
1094 adjustment_window: 1000, target_rate_persecond,
1096 last_adjustment: Mutex::new(Instant::now()),
1097 }
1098 }
1099
1100 fn adjust_samplingrate(&self) -> f64 {
1101 let total = self.total_count.load(Ordering::Relaxed);
1102 if total % self.adjustment_window == 0 && total > 0 {
1103 if let Ok(mut last) = self.last_adjustment.try_lock() {
1104 let now = Instant::now();
1105 let elapsed = now.duration_since(*last).as_secs_f64();
1106 *last = now;
1107
1108 if elapsed > 0.0 {
1109 let current_rate = total as f64 / elapsed;
1110 let adjustment_factor = self.target_rate_persecond / current_rate;
1111 let new_rate =
1112 (self.base_rate * adjustment_factor).clamp(self.min_rate, self.max_rate);
1113 return new_rate;
1114 }
1115 }
1116 }
1117 self.base_rate
1118 }
1119
1120 pub fn get_stats(&self) -> (u64, u64, f64) {
1121 let total = self.total_count.load(Ordering::Relaxed);
1122 let sampled = self.sample_count.load(Ordering::Relaxed);
1123 let rate = if total > 0 {
1124 sampled as f64 / total as f64
1125 } else {
1126 0.0
1127 };
1128 (total, sampled, rate)
1129 }
1130}
1131
1132impl TracingSampler for AdaptiveSampler {
1133 fn should_sample(&self, _context: &TraceContext, _spanname: &str) -> bool {
1134 self.total_count.fetch_add(1, Ordering::Relaxed);
1135
1136 let current_rate = self.adjust_samplingrate();
1137
1138 if current_rate >= 1.0 {
1139 self.sample_count.fetch_add(1, Ordering::Relaxed);
1140 true
1141 } else if current_rate <= 0.0 {
1142 false
1143 } else {
1144 use rand::Rng;
1145 let mut rng = rand::rng();
1146 if rng.random::<f64>() < current_rate {
1147 self.sample_count.fetch_add(1, Ordering::Relaxed);
1148 true
1149 } else {
1150 false
1151 }
1152 }
1153 }
1154}
1155
1156pub struct RateLimitingSampler {
1158 max_samples_persecond: u64,
1159 sample_count: AtomicU64,
1160 window_start: Mutex<Instant>,
1161 windowsize: Duration,
1162}
1163
1164impl RateLimitingSampler {
1165 pub fn new(max_samples_persecond: u64) -> Self {
1166 Self {
1167 max_samples_persecond,
1168 sample_count: AtomicU64::new(0),
1169 window_start: Mutex::new(Instant::now()),
1170 windowsize: Duration::from_secs(1),
1171 }
1172 }
1173
1174 fn reset_window_if_needed(&self) -> bool {
1175 if let Ok(mut start) = self.window_start.try_lock() {
1176 let now = Instant::now();
1177 if now.duration_since(*start) >= self.windowsize {
1178 *start = now;
1179 self.sample_count.store(0, Ordering::Relaxed);
1180 return true;
1181 }
1182 }
1183 false
1184 }
1185}
1186
1187impl TracingSampler for RateLimitingSampler {
1188 fn should_sample(&self, _context: &TraceContext, _spanname: &str) -> bool {
1189 self.reset_window_if_needed();
1190
1191 let current_count = self.sample_count.load(Ordering::Relaxed);
1192 if current_count < self.max_samples_persecond {
1193 self.sample_count.fetch_add(1, Ordering::Relaxed);
1194 true
1195 } else {
1196 false
1197 }
1198 }
1199}
1200
1201pub trait TraceExporter {
1203 fn export_span(&self, span: &Span) -> Result<(), CoreError>;
1205
1206 fn export_spans(&self, spans: &[Span]) -> Result<(), CoreError> {
1208 for span in spans {
1209 self.export_span(span)?;
1210 }
1211 Ok(())
1212 }
1213
1214 fn flush(&self) -> Result<(), CoreError>;
1216
1217 fn shutdown(&self) -> Result<(), CoreError>;
1219}
1220
1221pub struct BatchExporter {
1223 inner: Box<dyn TraceExporter + Send + Sync>,
1224 batch_size: usize,
1225 batch_timeout: Duration,
1226 buffer: Mutex<Vec<Span>>,
1227 last_export: Mutex<Instant>,
1228}
1229
1230impl BatchExporter {
1231 pub fn new(
1232 inner: Box<dyn TraceExporter + Send + Sync>,
1233 batch_size: usize,
1234 batch_timeout: Duration,
1235 ) -> Self {
1236 Self {
1237 inner,
1238 batch_size,
1239 batch_timeout,
1240 buffer: Mutex::new(Vec::new()),
1241 last_export: Mutex::new(Instant::now()),
1242 }
1243 }
1244
1245 fn should_flush(&self) -> bool {
1246 if let Ok(buffer) = self.buffer.try_lock() {
1247 if buffer.len() >= self.batch_size {
1248 return true;
1249 }
1250 }
1251
1252 if let Ok(last_export) = self.last_export.try_lock() {
1253 if last_export.elapsed() >= self.batch_timeout {
1254 return true;
1255 }
1256 }
1257
1258 false
1259 }
1260
1261 fn flush_internal(&self) -> Result<(), CoreError> {
1262 let spans_to_export = {
1263 let mut buffer = self.buffer.lock().map_err(|_| {
1264 CoreError::ComputationError(crate::error::ErrorContext::new(
1265 "Failed to acquire buffer lock".to_string(),
1266 ))
1267 })?;
1268 if buffer.is_empty() {
1269 return Ok(());
1270 }
1271 let spans = buffer.drain(..).collect::<Vec<_>>();
1272 spans
1273 };
1274
1275 if !spans_to_export.is_empty() {
1276 self.inner.export_spans(&spans_to_export)?;
1277
1278 if let Ok(mut last_export) = self.last_export.lock() {
1279 *last_export = Instant::now();
1280 }
1281 }
1282
1283 Ok(())
1284 }
1285}
1286
1287impl TraceExporter for BatchExporter {
1288 fn export_span(&self, span: &Span) -> Result<(), CoreError> {
1289 {
1290 let mut buffer = self.buffer.lock().map_err(|_| {
1291 CoreError::ComputationError(crate::error::ErrorContext::new(
1292 "Failed to acquire buffer lock".to_string(),
1293 ))
1294 })?;
1295 buffer.push(span.clone());
1296 }
1297
1298 if self.should_flush() {
1299 self.flush_internal()?;
1300 }
1301
1302 Ok(())
1303 }
1304
1305 fn flush(&self) -> Result<(), CoreError> {
1306 self.flush_internal()?;
1307 self.inner.flush()
1308 }
1309
1310 fn shutdown(&self) -> Result<(), CoreError> {
1311 self.flush_internal()?;
1312 self.inner.shutdown()
1313 }
1314}
1315
1316pub struct ConsoleExporter {
1318 prettyprint: bool,
1319}
1320
1321impl ConsoleExporter {
1322 pub fn new(prettyprint: bool) -> Self {
1323 Self { prettyprint }
1324 }
1325}
1326
1327impl TraceExporter for ConsoleExporter {
1328 fn export_span(&self, span: &Span) -> Result<(), CoreError> {
1329 if self.prettyprint {
1330 println!("=== Span Export ===");
1331 println!("Trace ID: {}", span.context.trace_id);
1332 println!("Span ID: {}", span.context.spanid);
1333 println!("Name: {}", span.name);
1334 println!("Duration: {:?}", span.metrics.duration);
1335 println!("Status: {:?}", span.status);
1336 if !span.attributes.is_empty() {
1337 println!("Attributes: {:?}", span.attributes);
1338 }
1339 if !span.events.is_empty() {
1340 println!("Events: {} recorded", span.events.len());
1341 }
1342 println!("==================");
1343 } else {
1344 println!(
1345 "SPAN: {} {} {:?} {:?}",
1346 span.context.trace_id, span.name, span.metrics.duration, span.status
1347 );
1348 }
1349 Ok(())
1350 }
1351
1352 fn flush(&self) -> Result<(), CoreError> {
1353 Ok(())
1355 }
1356
1357 fn shutdown(&self) -> Result<(), CoreError> {
1358 Ok(())
1359 }
1360}
1361
1362#[cfg(feature = "reqwest")]
1364pub struct HttpExporter {
1365 endpoint: String,
1366 client: reqwest::blocking::Client,
1367 #[allow(dead_code)]
1368 timeout: Duration,
1369}
1370
1371#[cfg(feature = "reqwest")]
1372impl HttpExporter {
1373 pub fn new(endpoint: String, timeout: Duration) -> Result<Self, CoreError> {
1374 let client = reqwest::blocking::Client::builder()
1375 .timeout(timeout)
1376 .build()
1377 .map_err(|e| {
1378 CoreError::ComputationError(crate::error::ErrorContext::new(format!(
1379 "Failed to create HTTP client: {}",
1380 e
1381 )))
1382 })?;
1383
1384 Ok(Self {
1385 endpoint,
1386 client,
1387 timeout,
1388 })
1389 }
1390}
1391
1392#[cfg(feature = "reqwest")]
1393impl TraceExporter for HttpExporter {
1394 fn export_span(&self, span: &Span) -> Result<(), CoreError> {
1395 #[cfg(feature = "serialization")]
1396 {
1397 let json = serde_json::to_string(span).map_err(|e| {
1398 CoreError::ComputationError(crate::error::ErrorContext::new(format!(
1399 "Failed to serialize span: {}",
1400 e
1401 )))
1402 })?;
1403
1404 let response = self
1405 .client
1406 .post(&self.endpoint)
1407 .header("Content-Type", "application/json")
1408 .body(json)
1409 .send()
1410 .map_err(|e| {
1411 CoreError::ComputationError(crate::error::ErrorContext::new(format!(
1412 "Failed to send span: {}",
1413 e
1414 )))
1415 })?;
1416
1417 if !response.status().is_success() {
1418 return Err(CoreError::ComputationError(
1419 crate::error::ErrorContext::new(format!(
1420 "Failed to export span: HTTP {}",
1421 response.status()
1422 )),
1423 ));
1424 }
1425
1426 Ok(())
1427 }
1428
1429 #[cfg(not(feature = "serialization"))]
1430 {
1431 Err(CoreError::ComputationError(
1432 crate::error::ErrorContext::new(
1433 "HTTP export requires serialization feature".to_string(),
1434 ),
1435 ))
1436 }
1437 }
1438
1439 fn flush(&self) -> Result<(), CoreError> {
1440 Ok(())
1442 }
1443
1444 fn shutdown(&self) -> Result<(), CoreError> {
1445 Ok(())
1446 }
1447}
1448
1449#[cfg(feature = "memory_metrics")]
1451#[allow(dead_code)]
1452fn get_current_memory_usage() -> Result<u64, CoreError> {
1453 Ok(0)
1459}
1460
1461static GLOBAL_TRACER: std::sync::OnceLock<Arc<TracingSystem>> = std::sync::OnceLock::new();
1463
1464#[allow(dead_code)]
1466pub fn init_tracing(config: TracingConfig) -> Result<(), CoreError> {
1467 let tracer = TracingSystem::new(config)?;
1468 match GLOBAL_TRACER.set(Arc::new(tracer)) {
1469 Ok(()) => Ok(()),
1470 Err(_) => {
1471 Ok(())
1473 }
1474 }
1475}
1476
1477#[allow(dead_code)]
1479pub fn global_tracer() -> Option<Arc<TracingSystem>> {
1480 GLOBAL_TRACER.get().cloned()
1481}
1482
1483#[macro_export]
1485macro_rules! trace_fn {
1486 ($name:expr, $block:block) => {{
1487 if let Some(tracer) = $crate::observability::tracing::global_tracer() {
1488 let span = tracer.start_span($name)?;
1489 span.in_span(|| $block)
1490 } else {
1491 $block
1492 }
1493 }};
1494}
1495
1496#[cfg(feature = "async")]
1498#[macro_export]
1499macro_rules! trace_async_fn {
1500 ($name:expr, $block:block) => {{
1501 if let Some(tracer) = $crate::observability::tracing::global_tracer() {
1502 let span = tracer.start_span($name)?;
1503 span.in_span_async(|| async move $block).await
1504 } else {
1505 async move $block.await
1506 }
1507 }};
1508}
1509
1510#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
1512#[derive(Debug, Clone)]
1513pub struct TracingVersion {
1514 pub major: u32,
1515 pub minor: u32,
1516 pub patch: u32,
1517}
1518
1519impl TracingVersion {
1520 pub const CURRENT: TracingVersion = TracingVersion {
1521 major: 1,
1522 minor: 0,
1523 patch: 0,
1524 };
1525
1526 pub fn new(major: u32, minor: u32, patch: u32) -> Self {
1527 Self {
1528 major,
1529 minor,
1530 patch,
1531 }
1532 }
1533
1534 pub fn is_compatible(&self, other: &TracingVersion) -> bool {
1535 self.major == other.major && self.minor <= other.minor
1536 }
1537}
1538
1539impl std::fmt::Display for TracingVersion {
1540 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1541 write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
1542 }
1543}
1544
1545#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
1547#[derive(Debug, Clone)]
1548pub struct NegotiationResult {
1549 pub agreed_version: TracingVersion,
1550 pub features_supported: Vec<String>,
1551 pub features_disabled: Vec<String>,
1552}
1553
1554#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
1556#[derive(Debug, Clone, Default)]
1557pub struct ResourceAttribution {
1558 pub cpu_timens: Option<u64>,
1560 pub memory_allocated_bytes: Option<u64>,
1562 pub memory_deallocated_bytes: Option<u64>,
1564 pub peak_memory_bytes: Option<u64>,
1566 pub io_operations: Option<u64>,
1568 pub bytes_read: Option<u64>,
1570 pub byteswritten: Option<u64>,
1572 pub network_requests: Option<u64>,
1574 pub gpu_memory_bytes: Option<u64>,
1576 pub gpu_compute_timens: Option<u64>,
1578}
1579
1580impl ResourceAttribution {
1581 pub fn new() -> Self {
1582 Self::default()
1583 }
1584
1585 pub fn with_cpu_time(mut self, cpu_timens: u64) -> Self {
1586 self.cpu_timens = Some(cpu_timens);
1587 self
1588 }
1589
1590 pub fn with_memory_allocation(mut self, bytes: u64) -> Self {
1591 self.memory_allocated_bytes = Some(bytes);
1592 self
1593 }
1594
1595 pub fn with_io_stats(mut self, operations: u64, bytes_read: u64, byteswritten: u64) -> Self {
1596 self.io_operations = Some(operations);
1597 self.bytes_read = Some(bytes_read);
1598 self.byteswritten = Some(byteswritten);
1599 self
1600 }
1601
1602 pub fn with_gpu_stats(mut self, memory_bytes: u64, compute_timens: u64) -> Self {
1603 self.gpu_memory_bytes = Some(memory_bytes);
1604 self.gpu_compute_timens = Some(compute_timens);
1605 self
1606 }
1607
1608 pub fn merge(&mut self, other: &ResourceAttribution) {
1609 if let Some(cpu) = other.cpu_timens {
1610 self.cpu_timens = Some(self.cpu_timens.unwrap_or(0) + cpu);
1611 }
1612 if let Some(mem) = other.memory_allocated_bytes {
1613 self.memory_allocated_bytes = Some(self.memory_allocated_bytes.unwrap_or(0) + mem);
1614 }
1615 if let Some(mem) = other.memory_deallocated_bytes {
1616 self.memory_deallocated_bytes = Some(self.memory_deallocated_bytes.unwrap_or(0) + mem);
1617 }
1618 if let Some(peak) = other.peak_memory_bytes {
1619 self.peak_memory_bytes = Some(self.peak_memory_bytes.unwrap_or(0).max(peak));
1620 }
1621 if let Some(io) = other.io_operations {
1622 self.io_operations = Some(self.io_operations.unwrap_or(0) + io);
1623 }
1624 if let Some(read) = other.bytes_read {
1625 self.bytes_read = Some(self.bytes_read.unwrap_or(0) + read);
1626 }
1627 if let Some(written) = other.byteswritten {
1628 self.byteswritten = Some(self.byteswritten.unwrap_or(0) + written);
1629 }
1630 if let Some(net) = other.network_requests {
1631 self.network_requests = Some(self.network_requests.unwrap_or(0) + net);
1632 }
1633 if let Some(gpu_mem) = other.gpu_memory_bytes {
1634 self.gpu_memory_bytes = Some(self.gpu_memory_bytes.unwrap_or(0) + gpu_mem);
1635 }
1636 if let Some(gpu_time) = other.gpu_compute_timens {
1637 self.gpu_compute_timens = Some(self.gpu_compute_timens.unwrap_or(0) + gpu_time);
1638 }
1639 }
1640}
1641
1642#[cfg_attr(feature = "serialization", derive(Serialize, Deserialize))]
1644#[derive(Debug, Clone, Default)]
1645pub struct EnhancedSpanMetrics {
1646 pub basic: SpanMetrics,
1648 pub resources: ResourceAttribution,
1650 pub performance_counters: HashMap<String, u64>,
1652}
1653
1654impl EnhancedSpanMetrics {
1655 pub fn new() -> Self {
1656 Self::default()
1657 }
1658
1659 pub fn add_performance_counter(&mut self, name: &str, value: u64) {
1660 *self
1661 .performance_counters
1662 .entry(name.to_string())
1663 .or_insert(0) += value;
1664 }
1665
1666 pub fn get_total_resource_cost(&self) -> f64 {
1667 let mut cost = 0.0;
1668
1669 if let Some(cpu_ns) = self.resources.cpu_timens {
1671 cost += cpu_ns as f64 / 1_000_000.0; }
1673
1674 if let Some(mem) = self.resources.memory_allocated_bytes {
1676 cost += mem as f64 / 1_048_576.0; }
1678
1679 if let Some(io) = self.resources.io_operations {
1681 cost += io as f64;
1682 }
1683
1684 cost
1685 }
1686}
1687
1688#[cfg(feature = "observability")]
1690#[allow(dead_code)]
1691pub fn integrate_with_metrics_system() -> Result<(), CoreError> {
1692 let registry = crate::metrics::global_metrics_registry();
1694
1695 use crate::metrics::{Counter, Gauge, Histogram};
1697
1698 registry.register(
1699 "tracing_spans_started".to_string(),
1700 Counter::new("tracing_spans_started".to_string()),
1701 )?;
1702 registry.register(
1703 "tracing_spans_completed".to_string(),
1704 Counter::new("tracing_spans_completed".to_string()),
1705 )?;
1706 registry.register(
1707 "tracing_spans_failed".to_string(),
1708 Counter::new("tracing_spans_failed".to_string()),
1709 )?;
1710 registry.register(
1711 "tracing_active_spans".to_string(),
1712 Gauge::new("tracing_active_spans".to_string()),
1713 )?;
1714 registry.register(
1715 "tracing_span_duration".to_string(),
1716 Histogram::with_buckets(
1717 "tracing_span_duration".to_string(),
1718 vec![0.001, 0.01, 0.1, 1.0, 10.0],
1719 ),
1720 )?;
1721
1722 Ok(())
1723}
1724
1725#[allow(dead_code)]
1727pub fn examplematrix_computation_with_tracing() -> Result<(), CoreError> {
1728 let config = TracingConfig {
1730 service_name: "matrix_computation_service".to_string(),
1731 samplingrate: 1.0, enable_performance_attribution: true,
1733 enable_distributed_context: true,
1734 ..TracingConfig::default()
1735 };
1736
1737 let tracing = TracingSystem::new(config)?;
1738 let _adaptive_sampler = AdaptiveSampler::new(0.1, 1000.0); let batch_exporter = BatchExporter::new(
1740 Box::new(ConsoleExporter::new(true)),
1741 50, Duration::from_secs(5), );
1744
1745 let tracing = tracing.with_exporter(Box::new(batch_exporter));
1746
1747 let computation_span = tracing.start_span("matrix_multiplication")?;
1749 computation_span.add_attribute("matrix_size", "1000x1000")?;
1750 computation_span.add_attribute("algorithm", "block_multiplication")?;
1751
1752 let _result = computation_span.in_span(|| {
1753 let alloc_span = tracing.start_span("memory_allocation")?;
1755 alloc_span.add_attribute("allocation_size", "8MB")?;
1756
1757 let _memory_result = alloc_span.in_span(|| {
1758 std::thread::sleep(Duration::from_millis(10));
1760 "allocated"
1761 });
1762
1763 let compute_span = tracing.start_span("matrix_compute")?;
1765 compute_span.add_metric("flops", 2_000_000_000.0)?; let _compute_result = compute_span.in_span(|| {
1768 std::thread::sleep(Duration::from_millis(100));
1770 "computed"
1771 });
1772
1773 Ok::<_, CoreError>("matrix_result")
1774 })?;
1775
1776 computation_span.add_attribute("result_status", "success")?;
1777 computation_span.end();
1778
1779 tracing.flush()?;
1781
1782 Ok(())
1783}
1784
1785#[cfg(test)]
1786#[path = "tracing_tests.rs"]
1787mod tests;