Skip to main content

ash_rpc/observability/
tracing.rs

1//! OpenTelemetry tracing for JSON-RPC
2
3use crate::Message;
4use opentelemetry::{
5    KeyValue, global,
6    trace::{Span, Status, Tracer},
7};
8
9/// Tracing processor for OpenTelemetry integration
10pub struct TracingProcessor {
11    service_name: String,
12}
13
14impl TracingProcessor {
15    /// Create a new tracing processor with the given service name
16    pub fn new(service_name: impl Into<String>) -> Self {
17        Self {
18            service_name: service_name.into(),
19        }
20    }
21
22    /// Create a tracer from the global tracer provider
23    #[must_use]
24    pub fn from_global(service_name: &str) -> Self {
25        Self::new(service_name)
26    }
27
28    /// Start a span for a message
29    #[must_use]
30    pub fn start_span(&self, message: &Message) -> Option<SpanGuard> {
31        let (span_name, method_name) = match message {
32            Message::Request(req) => (format!("jsonrpc.{}", req.method), Some(req.method.clone())),
33            Message::Notification(notif) => (
34                format!("jsonrpc.{}", notif.method),
35                Some(notif.method.clone()),
36            ),
37            Message::Response(_) => ("jsonrpc.response".to_owned(), None),
38        };
39
40        let tracer = global::tracer(self.service_name.clone());
41        let mut span = tracer.start(span_name.clone());
42
43        span.set_attribute(KeyValue::new("rpc.system", "jsonrpc"));
44        span.set_attribute(KeyValue::new("rpc.jsonrpc.version", "2.0"));
45
46        if let Some(method) = method_name {
47            span.set_attribute(KeyValue::new("rpc.method", method));
48        }
49
50        match message {
51            Message::Request(req) => {
52                if let Some(id) = &req.id {
53                    span.set_attribute(KeyValue::new("rpc.jsonrpc.request_id", id.to_string()));
54                }
55            }
56            Message::Notification(_) => {
57                span.set_attribute(KeyValue::new("rpc.jsonrpc.notification", true));
58            }
59            Message::Response(resp) => {
60                if let Some(id) = &resp.id {
61                    span.set_attribute(KeyValue::new("rpc.jsonrpc.request_id", id.to_string()));
62                }
63                if resp.is_error() {
64                    span.set_attribute(KeyValue::new("rpc.jsonrpc.error", true));
65                }
66            }
67        }
68
69        Some(SpanGuard { span })
70    }
71
72    /// Extract trace context from request parameters
73    /// Looks for a `_trace_context` field in params
74    #[must_use]
75    pub fn extract_context(params: &Option<serde_json::Value>) -> Option<opentelemetry::Context> {
76        if let Some(serde_json::Value::Object(map)) = params
77            && let Some(trace_ctx) = map.get("_trace_context")
78        {
79            // Try to extract traceparent header format
80            if let Some(traceparent) = trace_ctx.get("traceparent").and_then(|v| v.as_str()) {
81                // Parse W3C traceparent format
82                // Format: 00-{trace_id}-{span_id}-{flags}
83                return Self::parse_traceparent(traceparent);
84            }
85        }
86        None
87    }
88
89    /// Parse W3C traceparent header
90    fn parse_traceparent(_traceparent: &str) -> Option<opentelemetry::Context> {
91        // Simplified implementation - in production use proper W3C parser
92        // For now, return None to use automatic context propagation
93        None
94    }
95}
96
97/// Guard that records span end when dropped
98pub struct SpanGuard {
99    span: global::BoxedSpan,
100}
101
102impl SpanGuard {
103    /// Record an error in the span
104    pub fn record_error(&mut self) {
105        self.span.set_status(Status::error("Request failed"));
106    }
107
108    /// Add an event to the span
109    pub fn add_event(
110        &mut self,
111        name: impl Into<std::borrow::Cow<'static, str>>,
112        attributes: Vec<KeyValue>,
113    ) {
114        self.span.add_event(name, attributes);
115    }
116
117    /// Set a span attribute
118    pub fn set_attribute(&mut self, kv: KeyValue) {
119        self.span.set_attribute(kv);
120    }
121}
122
123impl Drop for SpanGuard {
124    fn drop(&mut self) {
125        self.span.end();
126    }
127}
128
129/// Builder for creating tracing processor with custom configuration
130pub struct TracingBuilder {
131    service_name: String,
132    service_version: Option<String>,
133}
134
135impl TracingBuilder {
136    /// Create a new builder with service name
137    pub fn new(service_name: impl Into<String>) -> Self {
138        Self {
139            service_name: service_name.into(),
140            service_version: None,
141        }
142    }
143
144    /// Set service version
145    #[must_use]
146    pub fn service_version(mut self, version: impl Into<String>) -> Self {
147        self.service_version = Some(version.into());
148        self
149    }
150
151    /// Build using the global tracer
152    #[must_use]
153    pub fn build(self) -> TracingProcessor {
154        TracingProcessor::from_global(&self.service_name)
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn test_tracing_builder() {
164        let _processor = TracingBuilder::new("test-service")
165            .service_version("1.0.0")
166            .build();
167    }
168
169    #[test]
170    fn test_extract_context_missing() {
171        let params = None;
172        let ctx = TracingProcessor::extract_context(&params);
173        assert!(ctx.is_none());
174    }
175
176    #[test]
177    fn test_extract_context_with_traceparent() {
178        let params = serde_json::json!({
179            "_trace_context": {
180                "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
181            }
182        });
183        let _ctx = TracingProcessor::extract_context(&Some(params));
184        // Currently returns None due to simplified implementation
185    }
186}