use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceContext {
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
pub trace_flags: u8,
pub trace_state: HashMap<String, String>,
}
impl TraceContext {
pub fn new() -> Self {
Self {
trace_id: generate_trace_id(),
span_id: generate_span_id(),
parent_span_id: None,
trace_flags: 0x01, trace_state: HashMap::new(),
}
}
pub fn create_child(&self) -> Self {
Self {
trace_id: self.trace_id.clone(),
span_id: generate_span_id(),
parent_span_id: Some(self.span_id.clone()),
trace_flags: self.trace_flags,
trace_state: self.trace_state.clone(),
}
}
pub fn is_sampled(&self) -> bool {
(self.trace_flags & 0x01) != 0
}
pub fn to_traceparent(&self) -> String {
format!(
"00-{}-{}-{:02x}",
self.trace_id, self.span_id, self.trace_flags
)
}
pub fn from_traceparent(header: &str) -> Result<Self> {
let parts: Vec<&str> = header.split('-').collect();
if parts.len() != 4 {
return Err(anyhow!("Invalid traceparent format"));
}
if parts[0] != "00" {
return Err(anyhow!("Unsupported trace version"));
}
let trace_flags =
u8::from_str_radix(parts[3], 16).map_err(|e| anyhow!("Invalid trace flags: {}", e))?;
Ok(Self {
trace_id: parts[1].to_string(),
span_id: parts[2].to_string(),
parent_span_id: None,
trace_flags,
trace_state: HashMap::new(),
})
}
}
impl Default for TraceContext {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum SpanStatus {
Ok,
Error,
InProgress,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum SpanKind {
Internal,
Server,
Client,
Producer,
Consumer,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Span {
pub span_id: String,
pub parent_span_id: Option<String>,
pub name: String,
pub kind: SpanKind,
pub start_time: SystemTime,
pub end_time: Option<SystemTime>,
pub duration: Option<Duration>,
pub status: SpanStatus,
pub attributes: HashMap<String, String>,
pub events: Vec<SpanEvent>,
pub service_name: String,
pub subgraph_name: Option<String>,
}
impl Span {
pub fn new(
span_id: String,
parent_span_id: Option<String>,
name: String,
kind: SpanKind,
service_name: String,
) -> Self {
Self {
span_id,
parent_span_id,
name,
kind,
start_time: SystemTime::now(),
end_time: None,
duration: None,
status: SpanStatus::InProgress,
attributes: HashMap::new(),
events: Vec::new(),
service_name,
subgraph_name: None,
}
}
pub fn set_attribute(&mut self, key: String, value: String) {
self.attributes.insert(key, value);
}
pub fn add_event(&mut self, name: String, attributes: HashMap<String, String>) {
self.events.push(SpanEvent {
timestamp: SystemTime::now(),
name,
attributes,
});
}
pub fn end(&mut self, status: SpanStatus) {
let end_time = SystemTime::now();
self.end_time = Some(end_time);
self.status = status;
if let Ok(duration) = end_time.duration_since(self.start_time) {
self.duration = Some(duration);
}
}
pub fn is_finished(&self) -> bool {
self.end_time.is_some()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpanEvent {
pub timestamp: SystemTime,
pub name: String,
pub attributes: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Trace {
pub trace_id: String,
pub root_span: Span,
pub spans: Vec<Span>,
pub start_time: SystemTime,
pub end_time: Option<SystemTime>,
pub duration: Option<Duration>,
pub attributes: HashMap<String, String>,
}
impl Trace {
pub fn new(trace_id: String, root_span: Span) -> Self {
let start_time = root_span.start_time;
Self {
trace_id,
root_span,
spans: Vec::new(),
start_time,
end_time: None,
duration: None,
attributes: HashMap::new(),
}
}
pub fn add_span(&mut self, span: Span) {
self.spans.push(span);
}
pub fn end(&mut self) {
let end_time = SystemTime::now();
self.end_time = Some(end_time);
if let Ok(duration) = end_time.duration_since(self.start_time) {
self.duration = Some(duration);
}
}
pub fn all_spans(&self) -> Vec<&Span> {
let mut spans = vec![&self.root_span];
spans.extend(self.spans.iter());
spans
}
pub fn subgraph_breakdown(&self) -> HashMap<String, Duration> {
let mut breakdown = HashMap::new();
for span in self.all_spans() {
if let (Some(subgraph), Some(duration)) = (&span.subgraph_name, span.duration) {
*breakdown.entry(subgraph.clone()).or_insert(Duration::ZERO) += duration;
}
}
breakdown
}
pub fn critical_path(&self) -> Vec<&Span> {
let mut spans = self.all_spans();
spans.sort_by_key(|s| s.start_time);
spans
}
}
#[derive(Debug, Clone)]
pub struct TracingConfig {
pub enabled: bool,
pub sample_rate: f64,
pub service_name: String,
pub export_enabled: bool,
pub export_endpoint: Option<String>,
pub max_spans_per_trace: usize,
pub max_trace_duration: Duration,
}
impl Default for TracingConfig {
fn default() -> Self {
Self {
enabled: true,
sample_rate: 1.0, service_name: "oxirs-gql".to_string(),
export_enabled: false,
export_endpoint: None,
max_spans_per_trace: 1000,
max_trace_duration: Duration::from_secs(60),
}
}
}
pub struct DistributedTracer {
config: TracingConfig,
active_traces: Arc<RwLock<HashMap<String, Trace>>>,
completed_traces: Arc<RwLock<Vec<Trace>>>,
active_spans: Arc<RwLock<HashMap<String, Span>>>,
}
impl DistributedTracer {
pub fn new(config: TracingConfig) -> Self {
Self {
config,
active_traces: Arc::new(RwLock::new(HashMap::new())),
completed_traces: Arc::new(RwLock::new(Vec::new())),
active_spans: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn start_trace(&self, context: &TraceContext, name: String) -> Result<Span> {
if !self.config.enabled || !self.should_sample() {
return Ok(Span::new(
context.span_id.clone(),
context.parent_span_id.clone(),
name,
SpanKind::Server,
self.config.service_name.clone(),
));
}
let root_span = Span::new(
context.span_id.clone(),
context.parent_span_id.clone(),
name,
SpanKind::Server,
self.config.service_name.clone(),
);
let trace = Trace::new(context.trace_id.clone(), root_span.clone());
let mut traces = self.active_traces.write().await;
traces.insert(context.trace_id.clone(), trace);
Ok(root_span)
}
pub async fn start_span(
&self,
context: &TraceContext,
name: String,
kind: SpanKind,
subgraph_name: Option<String>,
) -> Result<Span> {
if !self.config.enabled || !context.is_sampled() {
let mut span = Span::new(
context.span_id.clone(),
context.parent_span_id.clone(),
name,
kind,
self.config.service_name.clone(),
);
span.subgraph_name = subgraph_name;
return Ok(span);
}
let mut span = Span::new(
context.span_id.clone(),
context.parent_span_id.clone(),
name,
kind,
self.config.service_name.clone(),
);
span.subgraph_name = subgraph_name;
let mut spans = self.active_spans.write().await;
spans.insert(context.span_id.clone(), span.clone());
Ok(span)
}
pub async fn end_span(&self, trace_id: &str, span_id: &str, status: SpanStatus) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
let span = {
let mut spans = self.active_spans.write().await;
spans.remove(span_id)
};
if let Some(mut span) = span {
span.end(status);
let mut traces = self.active_traces.write().await;
if let Some(trace) = traces.get_mut(trace_id) {
trace.add_span(span);
}
}
Ok(())
}
pub async fn end_trace(&self, trace_id: &str) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
let trace = {
let mut traces = self.active_traces.write().await;
traces.remove(trace_id)
};
if let Some(mut trace) = trace {
trace.end();
if self.config.export_enabled {
self.export_trace(&trace).await?;
}
let mut completed = self.completed_traces.write().await;
completed.push(trace);
if completed.len() > 1000 {
completed.drain(0..500);
}
}
Ok(())
}
pub async fn get_trace(&self, trace_id: &str) -> Option<Trace> {
{
let traces = self.active_traces.read().await;
if let Some(trace) = traces.get(trace_id) {
return Some(trace.clone());
}
}
{
let completed = self.completed_traces.read().await;
completed.iter().find(|t| t.trace_id == trace_id).cloned()
}
}
pub async fn get_completed_traces(&self) -> Vec<Trace> {
let completed = self.completed_traces.read().await;
completed.clone()
}
fn should_sample(&self) -> bool {
use scirs2_core::random::{rng, RngExt};
let mut rng = rng();
rng.random_range(0.0..1.0) < self.config.sample_rate
}
async fn export_trace(&self, trace: &Trace) -> Result<()> {
if let Some(endpoint) = &self.config.export_endpoint {
tracing::debug!(
"Exporting trace {} to {} (not implemented)",
trace.trace_id,
endpoint
);
}
Ok(())
}
pub async fn get_stats(&self) -> TracingStats {
let active_traces = self.active_traces.read().await;
let completed_traces = self.completed_traces.read().await;
let avg_duration = if !completed_traces.is_empty() {
let total: Duration = completed_traces.iter().filter_map(|t| t.duration).sum();
total / completed_traces.len() as u32
} else {
Duration::ZERO
};
TracingStats {
active_traces: active_traces.len(),
completed_traces: completed_traces.len(),
total_traces: active_traces.len() + completed_traces.len(),
avg_trace_duration: avg_duration,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TracingStats {
pub active_traces: usize,
pub completed_traces: usize,
pub total_traces: usize,
pub avg_trace_duration: Duration,
}
fn generate_trace_id() -> String {
format!("{:032x}", Uuid::new_v4().as_u128())
}
fn generate_span_id() -> String {
format!("{:016x}", Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_trace_context_creation() {
let ctx = TraceContext::new();
assert_eq!(ctx.trace_id.len(), 32);
assert_eq!(ctx.span_id.len(), 16);
assert!(ctx.parent_span_id.is_none());
assert!(ctx.is_sampled());
}
#[test]
fn test_trace_context_child() {
let parent = TraceContext::new();
let child = parent.create_child();
assert_eq!(child.trace_id, parent.trace_id);
assert_ne!(child.span_id, parent.span_id);
assert_eq!(child.parent_span_id, Some(parent.span_id.clone()));
}
#[test]
fn test_traceparent_format() {
let ctx = TraceContext::new();
let header = ctx.to_traceparent();
assert!(header.starts_with("00-"));
assert_eq!(header.matches('-').count(), 3);
}
#[test]
fn test_traceparent_parse() {
let header = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01";
let ctx = TraceContext::from_traceparent(header).expect("should succeed");
assert_eq!(ctx.trace_id, "0af7651916cd43dd8448eb211c80319c");
assert_eq!(ctx.span_id, "b7ad6b7169203331");
assert_eq!(ctx.trace_flags, 0x01);
assert!(ctx.is_sampled());
}
#[test]
fn test_span_lifecycle() {
let mut span = Span::new(
"span123".to_string(),
None,
"test_span".to_string(),
SpanKind::Internal,
"test_service".to_string(),
);
assert_eq!(span.status, SpanStatus::InProgress);
assert!(!span.is_finished());
span.set_attribute("key".to_string(), "value".to_string());
assert_eq!(span.attributes.get("key"), Some(&"value".to_string()));
span.add_event("event1".to_string(), HashMap::new());
assert_eq!(span.events.len(), 1);
span.end(SpanStatus::Ok);
assert_eq!(span.status, SpanStatus::Ok);
assert!(span.is_finished());
assert!(span.duration.is_some());
}
#[test]
fn test_trace_creation() {
let root_span = Span::new(
"root".to_string(),
None,
"root_span".to_string(),
SpanKind::Server,
"service".to_string(),
);
let trace = Trace::new("trace123".to_string(), root_span);
assert_eq!(trace.trace_id, "trace123");
assert_eq!(trace.spans.len(), 0);
assert!(trace.duration.is_none());
}
#[test]
fn test_trace_add_spans() {
let root_span = Span::new(
"root".to_string(),
None,
"root".to_string(),
SpanKind::Server,
"service".to_string(),
);
let mut trace = Trace::new("trace123".to_string(), root_span);
let child_span = Span::new(
"child1".to_string(),
Some("root".to_string()),
"child".to_string(),
SpanKind::Client,
"service".to_string(),
);
trace.add_span(child_span);
assert_eq!(trace.spans.len(), 1);
assert_eq!(trace.all_spans().len(), 2); }
#[tokio::test]
async fn test_tracer_start_trace() {
let config = TracingConfig::default();
let tracer = DistributedTracer::new(config);
let ctx = TraceContext::new();
let span = tracer
.start_trace(&ctx, "test_query".to_string())
.await
.expect("should succeed");
assert_eq!(span.name, "test_query");
assert_eq!(span.kind, SpanKind::Server);
let trace = tracer.get_trace(&ctx.trace_id).await;
assert!(trace.is_some());
}
#[tokio::test]
async fn test_tracer_child_span() {
let config = TracingConfig::default();
let tracer = DistributedTracer::new(config);
let ctx = TraceContext::new();
tracer
.start_trace(&ctx, "root".to_string())
.await
.expect("should succeed");
let child_ctx = ctx.create_child();
let child_span = tracer
.start_span(
&child_ctx,
"child_query".to_string(),
SpanKind::Client,
Some("subgraph1".to_string()),
)
.await
.expect("should succeed");
assert_eq!(child_span.name, "child_query");
assert_eq!(child_span.subgraph_name, Some("subgraph1".to_string()));
assert_eq!(child_span.parent_span_id, Some(ctx.span_id.clone()));
}
#[tokio::test]
async fn test_tracer_end_span() {
let config = TracingConfig::default();
let tracer = DistributedTracer::new(config);
let ctx = TraceContext::new();
tracer
.start_trace(&ctx, "root".to_string())
.await
.expect("should succeed");
let child_ctx = ctx.create_child();
tracer
.start_span(&child_ctx, "child".to_string(), SpanKind::Client, None)
.await
.expect("should succeed");
tracer
.end_span(&ctx.trace_id, &child_ctx.span_id, SpanStatus::Ok)
.await
.expect("should succeed");
let trace = tracer
.get_trace(&ctx.trace_id)
.await
.expect("should succeed");
assert_eq!(trace.spans.len(), 1);
}
#[tokio::test]
async fn test_tracer_end_trace() {
let config = TracingConfig::default();
let tracer = DistributedTracer::new(config);
let ctx = TraceContext::new();
tracer
.start_trace(&ctx, "root".to_string())
.await
.expect("should succeed");
tracer
.end_trace(&ctx.trace_id)
.await
.expect("should succeed");
let completed = tracer.get_completed_traces().await;
assert_eq!(completed.len(), 1);
assert_eq!(completed[0].trace_id, ctx.trace_id);
}
#[tokio::test]
async fn test_tracer_disabled() {
let config = TracingConfig {
enabled: false,
..Default::default()
};
let tracer = DistributedTracer::new(config);
let ctx = TraceContext::new();
let span = tracer
.start_trace(&ctx, "test".to_string())
.await
.expect("should succeed");
assert_eq!(span.name, "test");
let trace = tracer.get_trace(&ctx.trace_id).await;
assert!(trace.is_none());
}
#[tokio::test]
async fn test_get_stats() {
let config = TracingConfig::default();
let tracer = DistributedTracer::new(config);
let ctx1 = TraceContext::new();
tracer
.start_trace(&ctx1, "trace1".to_string())
.await
.expect("should succeed");
let ctx2 = TraceContext::new();
tracer
.start_trace(&ctx2, "trace2".to_string())
.await
.expect("should succeed");
tracer
.end_trace(&ctx1.trace_id)
.await
.expect("should succeed");
let stats = tracer.get_stats().await;
assert_eq!(stats.active_traces, 1);
assert_eq!(stats.completed_traces, 1);
assert_eq!(stats.total_traces, 2);
}
#[test]
fn test_span_kind_variants() {
assert_eq!(SpanKind::Internal as i32, SpanKind::Internal as i32);
assert_ne!(SpanKind::Client as i32, SpanKind::Server as i32);
}
#[test]
fn test_span_status_variants() {
assert_eq!(SpanStatus::Ok, SpanStatus::Ok);
assert_ne!(SpanStatus::Ok, SpanStatus::Error);
assert_ne!(SpanStatus::Error, SpanStatus::InProgress);
}
}