ockam_core/routing/message/
opentelemetry.rs1use crate::errcode::{Kind, Origin};
2use core::fmt::{Display, Formatter};
3use core::str::FromStr;
4use opentelemetry::propagation::{Extractor, Injector};
5use opentelemetry::{global, Context};
6use serde::{Deserialize, Serialize};
7use std::cmp::Ordering;
8use std::collections::HashMap;
9use std::hash::{Hash, Hasher};
10use tracing_opentelemetry::OtelData;
11use tracing_subscriber::registry::LookupSpan;
12use tracing_subscriber::Registry;
13
14const TRACE_CONTEXT_PROPAGATION_SPAN: &str = "trace context propagation";
15
16pub const OCKAM_TRACER_NAME: &str = "ockam";
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
21pub struct OpenTelemetryContext(HashMap<String, String>);
22
23impl Hash for OpenTelemetryContext {
24 fn hash<H: Hasher>(&self, state: &mut H) {
25 self.to_string().hash(state)
26 }
27}
28
29impl PartialOrd for OpenTelemetryContext {
30 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
31 Some(self.cmp(other))
32 }
33}
34
35impl Ord for OpenTelemetryContext {
36 fn cmp(&self, other: &Self) -> Ordering {
37 self.to_string().cmp(&other.to_string())
38 }
39}
40
41impl OpenTelemetryContext {
42 pub fn extract(&self) -> Context {
44 global::get_text_map_propagator(|propagator| propagator.extract(self))
45 }
46
47 pub fn inject(context: &Context) -> Self {
49 global::get_text_map_propagator(|propagator| {
50 let mut propagation_context = OpenTelemetryContext::empty();
51 propagator.inject_context(context, &mut propagation_context);
52 propagation_context
53 })
54 }
55
56 pub fn update(mut self) -> OpenTelemetryContext {
58 let _guard = self.extract().attach();
59 let updated = OpenTelemetryContext::current();
60 self.0 = updated.0;
61 self
62 }
63
64 pub fn current() -> OpenTelemetryContext {
66 let span = tracing::trace_span!(TRACE_CONTEXT_PROPAGATION_SPAN);
80 let mut result = None;
81 tracing::dispatcher::get_default(|dispatcher| {
82 if let Some(registry) = dispatcher.downcast_ref::<Registry>() {
83 if let Some(id) = span.id() {
84 if let Some(span) = registry.span(&id) {
85 let mut extensions = span.extensions_mut();
86 if let Some(OtelData {
87 builder: _,
88 parent_cx,
89 }) = extensions.remove::<OtelData>()
90 {
91 result = Some(OpenTelemetryContext::inject(&parent_cx))
92 }
93 }
94 }
95 };
96 });
97 result.unwrap_or_else(|| OpenTelemetryContext::inject(&Context::current()))
100 }
101
102 fn set_as_parent_context(self) {
104 let parent_cx = self.extract();
105 let span = tracing::trace_span!(TRACE_CONTEXT_PROPAGATION_SPAN);
106 tracing::dispatcher::get_default(|dispatcher| {
107 if let Some(registry) = dispatcher.downcast_ref::<Registry>() {
108 if let Some(id) = span.id() {
109 if let Some(span) = registry.span(&id) {
110 if let Some(parent) = span.parent() {
111 let mut extensions = parent.extensions_mut();
112 if let Some(otel_data) = extensions.get_mut::<OtelData>() {
113 otel_data.parent_cx = parent_cx.clone();
114 }
115 }
116 {
117 let mut extensions = span.extensions_mut();
118 extensions.remove::<OtelData>();
119 }
120 }
121 }
122 };
123 })
124 }
125
126 pub fn current_context() -> Context {
128 OpenTelemetryContext::current().extract()
129 }
130
131 pub fn from_remote_context(tracing_context: &str) -> OpenTelemetryContext {
136 let result: Option<OpenTelemetryContext> = tracing_context.try_into().ok();
137 if let Some(tc) = result {
138 tc.set_as_parent_context()
139 };
140 OpenTelemetryContext::current()
141 }
142
143 fn empty() -> Self {
144 Self(HashMap::new())
145 }
146
147 pub fn as_map(&self) -> HashMap<String, String> {
149 self.0.clone()
150 }
151}
152
153impl Display for OpenTelemetryContext {
154 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
155 f.write_str(&serde_json::to_string(&self).map_err(|_| core::fmt::Error)?)
156 }
157}
158
159impl Injector for OpenTelemetryContext {
160 fn set(&mut self, key: &str, value: String) {
161 self.0.insert(key.to_owned(), value);
162 }
163}
164
165impl Extractor for OpenTelemetryContext {
166 fn get(&self, key: &str) -> Option<&str> {
167 let key = key.to_owned();
168 self.0.get(&key).map(|v| v.as_ref())
169 }
170
171 fn keys(&self) -> Vec<&str> {
172 self.0.keys().map(|k| k.as_ref()).collect()
173 }
174}
175
176impl TryFrom<&str> for OpenTelemetryContext {
178 type Error = crate::Error;
179
180 fn try_from(value: &str) -> crate::Result<Self> {
181 opentelemetry_context_parser(value)
182 }
183}
184
185impl FromStr for OpenTelemetryContext {
186 type Err = crate::Error;
187
188 fn from_str(s: &str) -> Result<Self, Self::Err> {
189 s.try_into()
190 }
191}
192
193impl TryFrom<String> for OpenTelemetryContext {
195 type Error = crate::Error;
196
197 fn try_from(value: String) -> crate::Result<Self> {
198 opentelemetry_context_parser(&value)
199 }
200}
201
202pub fn opentelemetry_context_parser(input: &str) -> crate::Result<OpenTelemetryContext> {
204 serde_json::from_str(input).map_err(|e| {
205 crate::Error::new(
206 Origin::Api,
207 Kind::Serialization,
208 format!("Invalid OpenTelemetry context: {input}. Got error: {e:?}"),
209 )
210 })
211}