minitrace_opentelemetry/
lib.rs

1// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.
2
3#![doc = include_str!("../README.md")]
4
5use std::borrow::Cow;
6use std::time::Duration;
7use std::time::UNIX_EPOCH;
8
9use minitrace::collector::EventRecord;
10use minitrace::collector::Reporter;
11use minitrace::prelude::*;
12use opentelemetry::trace::Event;
13use opentelemetry::trace::SpanContext;
14use opentelemetry::trace::SpanKind;
15use opentelemetry::trace::Status;
16use opentelemetry::trace::TraceFlags;
17use opentelemetry::trace::TraceState;
18use opentelemetry::InstrumentationLibrary;
19use opentelemetry::Key;
20use opentelemetry::KeyValue;
21use opentelemetry::StringValue;
22use opentelemetry::Value;
23use opentelemetry_sdk::export::trace::SpanData;
24use opentelemetry_sdk::export::trace::SpanExporter;
25use opentelemetry_sdk::trace::SpanEvents;
26use opentelemetry_sdk::trace::SpanLinks;
27use opentelemetry_sdk::Resource;
28
29/// [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-rust) reporter for `minitrace`.
30///
31/// `OpenTelemetryReporter` exports trace records to remote agents that OpenTelemetry
32/// supports, which includes Jaeger, Datadog, Zipkin, and OpenTelemetry Collector.
33pub struct OpenTelemetryReporter {
34    opentelemetry_exporter: Box<dyn SpanExporter>,
35    span_kind: SpanKind,
36    resource: Cow<'static, Resource>,
37    instrumentation_lib: InstrumentationLibrary,
38}
39
40impl OpenTelemetryReporter {
41    pub fn new(
42        opentelemetry_exporter: impl SpanExporter + 'static,
43        span_kind: SpanKind,
44        resource: Cow<'static, Resource>,
45        instrumentation_lib: InstrumentationLibrary,
46    ) -> Self {
47        OpenTelemetryReporter {
48            opentelemetry_exporter: Box::new(opentelemetry_exporter),
49            span_kind,
50            resource,
51            instrumentation_lib,
52        }
53    }
54
55    fn convert(&self, spans: &[SpanRecord]) -> Vec<SpanData> {
56        spans
57            .iter()
58            .map(move |span| SpanData {
59                span_context: SpanContext::new(
60                    span.trace_id.0.into(),
61                    span.span_id.0.into(),
62                    TraceFlags::default(),
63                    false,
64                    TraceState::default(),
65                ),
66                dropped_attributes_count: 0,
67                parent_span_id: span.parent_id.0.into(),
68                name: span.name.clone(),
69                start_time: UNIX_EPOCH + Duration::from_nanos(span.begin_time_unix_ns),
70                end_time: UNIX_EPOCH
71                    + Duration::from_nanos(span.begin_time_unix_ns + span.duration_ns),
72                attributes: Self::convert_properties(&span.properties),
73                events: Self::convert_events(&span.events),
74                links: SpanLinks::default(),
75                status: Status::default(),
76                span_kind: self.span_kind.clone(),
77                resource: self.resource.clone(),
78                instrumentation_lib: self.instrumentation_lib.clone(),
79            })
80            .collect()
81    }
82
83    fn convert_properties(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> Vec<KeyValue> {
84        let mut map = Vec::new();
85        for (k, v) in properties {
86            map.push(KeyValue::new(
87                cow_to_otel_key(k.clone()),
88                cow_to_otel_value(v.clone()),
89            ));
90        }
91        map
92    }
93
94    fn convert_events(events: &[EventRecord]) -> SpanEvents {
95        let mut queue = SpanEvents::default();
96        queue.events.extend(events.iter().map(|event| {
97            Event::new(
98                event.name.clone(),
99                UNIX_EPOCH + Duration::from_nanos(event.timestamp_unix_ns),
100                event
101                    .properties
102                    .iter()
103                    .map(|(k, v)| {
104                        KeyValue::new(cow_to_otel_key(k.clone()), cow_to_otel_value(v.clone()))
105                    })
106                    .collect(),
107                0,
108            )
109        }));
110        queue
111    }
112
113    fn try_report(&mut self, spans: &[SpanRecord]) -> Result<(), Box<dyn std::error::Error>> {
114        let opentelemetry_spans = self.convert(spans);
115        futures::executor::block_on(self.opentelemetry_exporter.export(opentelemetry_spans))?;
116        Ok(())
117    }
118}
119
120impl Reporter for OpenTelemetryReporter {
121    fn report(&mut self, spans: &[SpanRecord]) {
122        if spans.is_empty() {
123            return;
124        }
125
126        if let Err(err) = self.try_report(spans) {
127            log::error!("report to opentelemetry failed: {}", err);
128        }
129    }
130}
131
132fn cow_to_otel_key(cow: Cow<'static, str>) -> Key {
133    match cow {
134        Cow::Borrowed(s) => Key::from_static_str(s),
135        Cow::Owned(s) => Key::from(s),
136    }
137}
138
139fn cow_to_otel_value(cow: Cow<'static, str>) -> Value {
140    match cow {
141        Cow::Borrowed(s) => Value::String(StringValue::from(s)),
142        Cow::Owned(s) => Value::String(StringValue::from(s)),
143    }
144}