fastrace_opentelemetry/
lib.rs

1// Copyright 2024 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file is derived from [1] under the original license header:
16// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
17// [1]: https://github.com/tikv/minitrace-rust/blob/v0.6.4/minitrace-opentelemetry/src/lib.rs
18
19#![doc = include_str!("../README.md")]
20
21use std::borrow::Cow;
22use std::collections::HashSet;
23use std::fmt::Debug;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::LazyLock;
27use std::time::Duration;
28use std::time::SystemTime;
29
30use fastrace::collector::EventRecord;
31use fastrace::collector::Reporter;
32use fastrace::prelude::*;
33use opentelemetry::InstrumentationScope;
34use opentelemetry::KeyValue;
35use opentelemetry::trace::Event;
36use opentelemetry::trace::SpanContext as OtelSpanContext;
37use opentelemetry::trace::SpanKind;
38use opentelemetry::trace::Status;
39use opentelemetry::trace::TraceFlags;
40use opentelemetry::trace::TraceState;
41use opentelemetry_sdk::Resource;
42use opentelemetry_sdk::error::OTelSdkResult;
43use opentelemetry_sdk::trace::SpanData;
44use opentelemetry_sdk::trace::SpanEvents;
45use opentelemetry_sdk::trace::SpanExporter;
46use opentelemetry_sdk::trace::SpanLinks;
47
48/// [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-rust) reporter for `fastrace`.
49///
50/// `OpenTelemetryReporter` exports trace records to remote agents that implements the
51/// OpenTelemetry protocol, such as Jaeger, Zipkin, etc.
52///
53/// ## Span Kind
54///
55/// The reporter automatically maps the `span.kind` property from fastrace spans to OpenTelemetry
56/// span kinds. Supported values are: "client", "server", "producer", "consumer", and "internal"
57/// (case-insensitive). If no `span.kind` property is provided, spans default to
58/// `SpanKind::Internal`.
59///
60/// ## Span Status
61///
62/// The reporter maps the `span.status_code` and `span.status_description` properties from fastrace
63/// spans to OpenTelemetry span status. Supported codes are: "unset", "ok", and "error"
64/// (case-insensitive). If no `span.status_code` property is provided, spans default to
65/// `Status::Unset`. If the code is "error", the `span.status_description` property is used as the
66/// error description.
67///
68/// ## Parent Span Is Remote
69///
70/// The reporter maps the `span.parent_span_is_remote` property from fastrace spans to indicate
71/// whether the parent span is remote. Supported values are "true" and "false" (case-insensitive).
72/// If no `span.parent_span_is_remote` property is provided, it defaults to `false`.
73pub struct OpenTelemetryReporter {
74    exporter: Box<dyn DynSpanExporter>,
75    instrumentation_scope: InstrumentationScope,
76}
77
78/// Returns the OpenTelemetry [`SpanContext`] of the current fastrace local parent span.
79///
80/// This helper bridges fastrace's **thread-local parent stack** (set via
81/// [`Span::set_local_parent`]) into an OpenTelemetry
82/// `SpanContext` so you can interoperate with OpenTelemetry-based instrumentation.
83///
84/// It returns `None` when:
85/// - fastrace's `enable` feature is disabled (the local parent stack is inert), or
86/// - no local parent is currently set for the thread.
87///
88/// The returned span context is **non-recording** (it does not create an OpenTelemetry span on
89/// its own). To make it usable as a parent for OpenTelemetry spans, attach it to an
90/// OpenTelemetry [`Context`](opentelemetry::Context) via
91/// [`TraceContextExt::with_remote_span_context`](opentelemetry::trace::TraceContextExt::with_remote_span_context).
92///
93/// # Examples
94///
95/// ```rust, no_run
96/// use fastrace::prelude::*;
97/// use opentelemetry::Context;
98/// use opentelemetry::trace::TraceContextExt;
99///
100/// let root = Span::root("root", SpanContext::random());
101/// let _g = root.set_local_parent();
102///
103/// // Make the fastrace span the "current" OpenTelemetry parent for this thread.
104/// let _otel_guard = fastrace_opentelemetry::current_opentelemetry_context()
105///     .map(|cx| Context::current().with_remote_span_context(cx).attach());
106///
107/// // Any OpenTelemetry instrumentation that reads `Context::current()` can now
108/// // treat the fastrace span as its parent.
109/// ```
110pub fn current_opentelemetry_context() -> Option<OtelSpanContext> {
111    let span_context = fastrace::collector::SpanContext::current_local_parent()?;
112
113    let trace_flags = if span_context.sampled {
114        TraceFlags::SAMPLED
115    } else {
116        TraceFlags::default()
117    };
118
119    Some(OtelSpanContext::new(
120        span_context.trace_id.0.into(),
121        span_context.span_id.0.into(),
122        trace_flags,
123        false,
124        TraceState::default(),
125    ))
126}
127
128pub const SPAN_KIND: &str = "span.kind";
129pub const SPAN_STATUS_CODE: &str = "span.status_code";
130pub const SPAN_STATUS_DESCRIPTION: &str = "span.status_description";
131pub const SPAN_PARENT_SPAN_IS_REMOTE: &str = "span.parent_span_is_remote";
132
133static OTEL_PROPERTIES: LazyLock<HashSet<&str>> = LazyLock::new(|| {
134    HashSet::from([
135        SPAN_KIND,
136        SPAN_STATUS_CODE,
137        SPAN_STATUS_DESCRIPTION,
138        SPAN_PARENT_SPAN_IS_REMOTE,
139    ])
140});
141
142/// Convert a list of properties to a list of key-value pairs.
143fn map_props_to_kvs(props: Vec<(Cow<'static, str>, Cow<'static, str>)>) -> Vec<KeyValue> {
144    props
145        .into_iter()
146        .filter(|(k, _)| !OTEL_PROPERTIES.contains(k.as_ref()))
147        .map(|(k, v)| KeyValue::new(k, v))
148        .collect()
149}
150
151/// Convert a list of [`EventRecord`] to OpenTelemetry [`SpanEvents`].
152fn map_events(events: Vec<EventRecord>) -> SpanEvents {
153    let mut queue = SpanEvents::default();
154    queue.events.reserve(events.len());
155
156    for EventRecord {
157        name,
158        timestamp_unix_ns,
159        properties,
160    } in events
161    {
162        let time = SystemTime::UNIX_EPOCH + Duration::from_nanos(timestamp_unix_ns);
163        let attributes = map_props_to_kvs(properties);
164        queue.events.push(Event::new(name, time, attributes, 0));
165    }
166
167    queue
168}
169
170trait DynSpanExporter: Send + Sync + Debug {
171    fn export(
172        &self,
173        batch: Vec<SpanData>,
174    ) -> Pin<Box<dyn Future<Output = OTelSdkResult> + Send + '_>>;
175}
176
177impl<T: SpanExporter> DynSpanExporter for T {
178    fn export(
179        &self,
180        batch: Vec<SpanData>,
181    ) -> Pin<Box<dyn Future<Output = OTelSdkResult> + Send + '_>> {
182        Box::pin(SpanExporter::export(self, batch))
183    }
184}
185
186impl OpenTelemetryReporter {
187    pub fn new(
188        mut exporter: impl SpanExporter + 'static,
189        resource: Cow<'static, Resource>,
190        instrumentation_scope: InstrumentationScope,
191    ) -> Self {
192        exporter.set_resource(&resource);
193        OpenTelemetryReporter {
194            exporter: Box::new(exporter),
195            instrumentation_scope,
196        }
197    }
198
199    fn convert(&self, spans: Vec<SpanRecord>) -> Vec<SpanData> {
200        spans
201            .into_iter()
202            .map(
203                |SpanRecord {
204                     trace_id,
205                     span_id,
206                     parent_id,
207                     begin_time_unix_ns,
208                     duration_ns,
209                     name,
210                     properties,
211                     events,
212                 }| {
213                    let parent_span_id = parent_id.0.into();
214                    let span_kind = span_kind(&properties);
215                    let status = span_status(&properties);
216                    let parent_span_is_remote = parent_span_is_remote(&properties);
217                    let instrumentation_scope = self.instrumentation_scope.clone();
218                    let start_time =
219                        SystemTime::UNIX_EPOCH + Duration::from_nanos(begin_time_unix_ns);
220                    let end_time = SystemTime::UNIX_EPOCH
221                        + Duration::from_nanos(begin_time_unix_ns + duration_ns);
222                    let attributes = map_props_to_kvs(properties);
223                    let events = map_events(events);
224
225                    SpanData {
226                        span_context: OtelSpanContext::new(
227                            trace_id.0.into(),
228                            span_id.0.into(),
229                            TraceFlags::default(),
230                            parent_span_is_remote,
231                            TraceState::default(),
232                        ),
233                        parent_span_id,
234                        parent_span_is_remote,
235                        span_kind,
236                        name,
237                        start_time,
238                        end_time,
239                        attributes,
240                        dropped_attributes_count: 0,
241                        events,
242                        links: SpanLinks::default(),
243                        status,
244                        instrumentation_scope,
245                    }
246                },
247            )
248            .collect()
249    }
250
251    fn try_report(&mut self, spans: Vec<SpanRecord>) -> Result<(), Box<dyn std::error::Error>> {
252        let spans = self.convert(spans);
253        pollster::block_on(self.exporter.export(spans))?;
254        Ok(())
255    }
256}
257
258impl Reporter for OpenTelemetryReporter {
259    fn report(&mut self, spans: Vec<SpanRecord>) {
260        if spans.is_empty() {
261            return;
262        }
263
264        if let Err(err) = self.try_report(spans) {
265            log::error!("failed to report to opentelemetry: {err}");
266        }
267    }
268}
269
270fn span_kind(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> SpanKind {
271    properties
272        .iter()
273        .find(|(k, _)| k == SPAN_KIND)
274        .and_then(|(_, v)| match v.to_lowercase().as_str() {
275            "client" => Some(SpanKind::Client),
276            "server" => Some(SpanKind::Server),
277            "producer" => Some(SpanKind::Producer),
278            "consumer" => Some(SpanKind::Consumer),
279            "internal" => Some(SpanKind::Internal),
280            _ => None,
281        })
282        .unwrap_or(SpanKind::Internal)
283}
284
285fn span_status(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> Status {
286    let status_description = properties
287        .iter()
288        .find(|(k, _)| k == SPAN_STATUS_DESCRIPTION)
289        .map(|(_, v)| v.to_string())
290        .unwrap_or_default();
291    properties
292        .iter()
293        .find(|(k, _)| k == SPAN_STATUS_CODE)
294        .and_then(|(_, v)| match v.to_lowercase().as_str() {
295            "unset" => Some(Status::Unset),
296            "ok" => Some(Status::Ok),
297            "error" => Some(Status::Error {
298                description: status_description.into(),
299            }),
300            _ => None,
301        })
302        .unwrap_or(Status::Unset)
303}
304
305fn parent_span_is_remote(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> bool {
306    properties
307        .iter()
308        .find(|(k, _)| k == SPAN_PARENT_SPAN_IS_REMOTE)
309        .map(|(_, v)| v.to_lowercase().as_str() == "true")
310        .unwrap_or(false)
311}