ockam_core/routing/message/
opentelemetry.rs

1use crate::errcode::{Kind, Origin};
2use core::fmt::{Display, Formatter};
3use core::str::FromStr;
4use opentelemetry::propagation::{Extractor, Injector};
5use opentelemetry::{global, Context};
6use serde::{Deserialize, Serialize};
7use std::cmp::Ordering;
8use std::collections::HashMap;
9use std::hash::{Hash, Hasher};
10use tracing_opentelemetry::OtelData;
11use tracing_subscriber::registry::LookupSpan;
12use tracing_subscriber::Registry;
13
14const TRACE_CONTEXT_PROPAGATION_SPAN: &str = "trace context propagation";
15
16/// Name of the global Ockam tracer
17pub const OCKAM_TRACER_NAME: &str = "ockam";
18
19/// Serializable data type to hold the opentelemetry propagation context.
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
21pub struct OpenTelemetryContext(HashMap<String, String>);
22
23impl Hash for OpenTelemetryContext {
24    fn hash<H: Hasher>(&self, state: &mut H) {
25        self.to_string().hash(state)
26    }
27}
28
29impl PartialOrd for OpenTelemetryContext {
30    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
31        Some(self.cmp(other))
32    }
33}
34
35impl Ord for OpenTelemetryContext {
36    fn cmp(&self, other: &Self) -> Ordering {
37        self.to_string().cmp(&other.to_string())
38    }
39}
40
41impl OpenTelemetryContext {
42    /// Recover an OpenTelemetry context from the currently serialized data
43    pub fn extract(&self) -> Context {
44        global::get_text_map_propagator(|propagator| propagator.extract(self))
45    }
46
47    /// Serialize the current OpenTelemetry context as OpenTelemetryContext
48    pub fn inject(context: &Context) -> Self {
49        global::get_text_map_propagator(|propagator| {
50            let mut propagation_context = OpenTelemetryContext::empty();
51            propagator.inject_context(context, &mut propagation_context);
52            propagation_context
53        })
54    }
55
56    /// Update the OpenTelemetryContext with the latest span id
57    pub fn update(mut self) -> OpenTelemetryContext {
58        let _guard = self.extract().attach();
59        let updated = OpenTelemetryContext::current();
60        self.0 = updated.0;
61        self
62    }
63
64    /// Return the current OpenTelemetryContext
65    pub fn current() -> OpenTelemetryContext {
66        // In order to get the current OpenTelemetry context that is connected to the
67        // current span, as instrumented with the #[instrument] attribute, we need to:
68        //
69        //   1. Create a temporary span.
70        //   2. Get its data, given its id, from the global registry.
71        //   3. In the span extensions we can find the OpenTelemetry context that is used to attribute span ids.
72        //      That context contains the span id of the latest span created with OpenTelemetry.
73        //      That span is not the dummy span created below but the latest span created with #[instrument] in the
74        //      current call stack.
75        //      Note that opentelemetry::Context::current() would return a Context which only contains the latest context
76        //      created with `tracer::in_span(...)` which is at the root of this trace. This is why we have to dig deep
77        //      in order to retrieve the correct span id.
78        //   4. Remove the OtelData extension so that our dummy "trace context propagation span" doesn't get emitted.
79        let span = tracing::trace_span!(TRACE_CONTEXT_PROPAGATION_SPAN);
80        let mut result = None;
81        tracing::dispatcher::get_default(|dispatcher| {
82            if let Some(registry) = dispatcher.downcast_ref::<Registry>() {
83                if let Some(id) = span.id() {
84                    if let Some(span) = registry.span(&id) {
85                        let mut extensions = span.extensions_mut();
86                        if let Some(OtelData {
87                            builder: _,
88                            parent_cx,
89                        }) = extensions.remove::<OtelData>()
90                        {
91                            result = Some(OpenTelemetryContext::inject(&parent_cx))
92                        }
93                    }
94                }
95            };
96        });
97        // If, for some reason, we cannot retrieve the proper tracing context, we use the latest known
98        // OpenTelemetry context
99        result.unwrap_or_else(|| OpenTelemetryContext::inject(&Context::current()))
100    }
101
102    /// Set this OpenTelemetry context as the new parent context
103    fn set_as_parent_context(self) {
104        let parent_cx = self.extract();
105        let span = tracing::trace_span!(TRACE_CONTEXT_PROPAGATION_SPAN);
106        tracing::dispatcher::get_default(|dispatcher| {
107            if let Some(registry) = dispatcher.downcast_ref::<Registry>() {
108                if let Some(id) = span.id() {
109                    if let Some(span) = registry.span(&id) {
110                        if let Some(parent) = span.parent() {
111                            let mut extensions = parent.extensions_mut();
112                            if let Some(otel_data) = extensions.get_mut::<OtelData>() {
113                                otel_data.parent_cx = parent_cx.clone();
114                            }
115                        }
116                        {
117                            let mut extensions = span.extensions_mut();
118                            extensions.remove::<OtelData>();
119                        }
120                    }
121                }
122            };
123        })
124    }
125
126    /// Return the current opentelemetry::Context
127    pub fn current_context() -> Context {
128        OpenTelemetryContext::current().extract()
129    }
130
131    /// Parse a serialized tracing context, set it as the current parent context
132    /// and return the current OpenTelemetry context
133    /// This function is use to start new traces when receiving a serialized OpenTelemetryContext
134    /// from remote nodes.
135    pub fn from_remote_context(tracing_context: &str) -> OpenTelemetryContext {
136        let result: Option<OpenTelemetryContext> = tracing_context.try_into().ok();
137        if let Some(tc) = result {
138            tc.set_as_parent_context()
139        };
140        OpenTelemetryContext::current()
141    }
142
143    fn empty() -> Self {
144        Self(HashMap::new())
145    }
146
147    /// Return the keys and values for testing
148    pub fn as_map(&self) -> HashMap<String, String> {
149        self.0.clone()
150    }
151}
152
153impl Display for OpenTelemetryContext {
154    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
155        f.write_str(&serde_json::to_string(&self).map_err(|_| core::fmt::Error)?)
156    }
157}
158
159impl Injector for OpenTelemetryContext {
160    fn set(&mut self, key: &str, value: String) {
161        self.0.insert(key.to_owned(), value);
162    }
163}
164
165impl Extractor for OpenTelemetryContext {
166    fn get(&self, key: &str) -> Option<&str> {
167        let key = key.to_owned();
168        self.0.get(&key).map(|v| v.as_ref())
169    }
170
171    fn keys(&self) -> Vec<&str> {
172        self.0.keys().map(|k| k.as_ref()).collect()
173    }
174}
175
176/// Parse the OpenTelemetry context from a String
177impl TryFrom<&str> for OpenTelemetryContext {
178    type Error = crate::Error;
179
180    fn try_from(value: &str) -> crate::Result<Self> {
181        opentelemetry_context_parser(value)
182    }
183}
184
185impl FromStr for OpenTelemetryContext {
186    type Err = crate::Error;
187
188    fn from_str(s: &str) -> Result<Self, Self::Err> {
189        s.try_into()
190    }
191}
192
193/// Parse the OpenTelemetry context from a String
194impl TryFrom<String> for OpenTelemetryContext {
195    type Error = crate::Error;
196
197    fn try_from(value: String) -> crate::Result<Self> {
198        opentelemetry_context_parser(&value)
199    }
200}
201
202/// Parse the OpenTelemetry context from a String
203pub fn opentelemetry_context_parser(input: &str) -> crate::Result<OpenTelemetryContext> {
204    serde_json::from_str(input).map_err(|e| {
205        crate::Error::new(
206            Origin::Api,
207            Kind::Serialization,
208            format!("Invalid OpenTelemetry context: {input}. Got error: {e:?}"),
209        )
210    })
211}