ash_rpc/observability/
tracing.rs1use crate::Message;
4use opentelemetry::{
5 KeyValue, global,
6 trace::{Span, Status, Tracer},
7};
8
9pub struct TracingProcessor {
11 service_name: String,
12}
13
14impl TracingProcessor {
15 pub fn new(service_name: impl Into<String>) -> Self {
17 Self {
18 service_name: service_name.into(),
19 }
20 }
21
22 #[must_use]
24 pub fn from_global(service_name: &str) -> Self {
25 Self::new(service_name)
26 }
27
28 #[must_use]
30 pub fn start_span(&self, message: &Message) -> Option<SpanGuard> {
31 let (span_name, method_name) = match message {
32 Message::Request(req) => (format!("jsonrpc.{}", req.method), Some(req.method.clone())),
33 Message::Notification(notif) => (
34 format!("jsonrpc.{}", notif.method),
35 Some(notif.method.clone()),
36 ),
37 Message::Response(_) => ("jsonrpc.response".to_owned(), None),
38 };
39
40 let tracer = global::tracer(self.service_name.clone());
41 let mut span = tracer.start(span_name.clone());
42
43 span.set_attribute(KeyValue::new("rpc.system", "jsonrpc"));
44 span.set_attribute(KeyValue::new("rpc.jsonrpc.version", "2.0"));
45
46 if let Some(method) = method_name {
47 span.set_attribute(KeyValue::new("rpc.method", method));
48 }
49
50 match message {
51 Message::Request(req) => {
52 if let Some(id) = &req.id {
53 span.set_attribute(KeyValue::new("rpc.jsonrpc.request_id", id.to_string()));
54 }
55 }
56 Message::Notification(_) => {
57 span.set_attribute(KeyValue::new("rpc.jsonrpc.notification", true));
58 }
59 Message::Response(resp) => {
60 if let Some(id) = &resp.id {
61 span.set_attribute(KeyValue::new("rpc.jsonrpc.request_id", id.to_string()));
62 }
63 if resp.is_error() {
64 span.set_attribute(KeyValue::new("rpc.jsonrpc.error", true));
65 }
66 }
67 }
68
69 Some(SpanGuard { span })
70 }
71
72 #[must_use]
75 pub fn extract_context(params: &Option<serde_json::Value>) -> Option<opentelemetry::Context> {
76 if let Some(serde_json::Value::Object(map)) = params
77 && let Some(trace_ctx) = map.get("_trace_context")
78 {
79 if let Some(traceparent) = trace_ctx.get("traceparent").and_then(|v| v.as_str()) {
81 return Self::parse_traceparent(traceparent);
84 }
85 }
86 None
87 }
88
89 fn parse_traceparent(_traceparent: &str) -> Option<opentelemetry::Context> {
91 None
94 }
95}
96
97pub struct SpanGuard {
99 span: global::BoxedSpan,
100}
101
102impl SpanGuard {
103 pub fn record_error(&mut self) {
105 self.span.set_status(Status::error("Request failed"));
106 }
107
108 pub fn add_event(
110 &mut self,
111 name: impl Into<std::borrow::Cow<'static, str>>,
112 attributes: Vec<KeyValue>,
113 ) {
114 self.span.add_event(name, attributes);
115 }
116
117 pub fn set_attribute(&mut self, kv: KeyValue) {
119 self.span.set_attribute(kv);
120 }
121}
122
123impl Drop for SpanGuard {
124 fn drop(&mut self) {
125 self.span.end();
126 }
127}
128
129pub struct TracingBuilder {
131 service_name: String,
132 service_version: Option<String>,
133}
134
135impl TracingBuilder {
136 pub fn new(service_name: impl Into<String>) -> Self {
138 Self {
139 service_name: service_name.into(),
140 service_version: None,
141 }
142 }
143
144 #[must_use]
146 pub fn service_version(mut self, version: impl Into<String>) -> Self {
147 self.service_version = Some(version.into());
148 self
149 }
150
151 #[must_use]
153 pub fn build(self) -> TracingProcessor {
154 TracingProcessor::from_global(&self.service_name)
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn test_tracing_builder() {
164 let _processor = TracingBuilder::new("test-service")
165 .service_version("1.0.0")
166 .build();
167 }
168
169 #[test]
170 fn test_extract_context_missing() {
171 let params = None;
172 let ctx = TracingProcessor::extract_context(¶ms);
173 assert!(ctx.is_none());
174 }
175
176 #[test]
177 fn test_extract_context_with_traceparent() {
178 let params = serde_json::json!({
179 "_trace_context": {
180 "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
181 }
182 });
183 let _ctx = TracingProcessor::extract_context(&Some(params));
184 }
186}