minitrace_opentelemetry/
lib.rs1#![doc = include_str!("../README.md")]
4
5use std::borrow::Cow;
6use std::time::Duration;
7use std::time::UNIX_EPOCH;
8
9use minitrace::collector::EventRecord;
10use minitrace::collector::Reporter;
11use minitrace::prelude::*;
12use opentelemetry::trace::Event;
13use opentelemetry::trace::SpanContext;
14use opentelemetry::trace::SpanKind;
15use opentelemetry::trace::Status;
16use opentelemetry::trace::TraceFlags;
17use opentelemetry::trace::TraceState;
18use opentelemetry::InstrumentationLibrary;
19use opentelemetry::Key;
20use opentelemetry::KeyValue;
21use opentelemetry::StringValue;
22use opentelemetry::Value;
23use opentelemetry_sdk::export::trace::SpanData;
24use opentelemetry_sdk::export::trace::SpanExporter;
25use opentelemetry_sdk::trace::SpanEvents;
26use opentelemetry_sdk::trace::SpanLinks;
27use opentelemetry_sdk::Resource;
28
29pub struct OpenTelemetryReporter {
34 opentelemetry_exporter: Box<dyn SpanExporter>,
35 span_kind: SpanKind,
36 resource: Cow<'static, Resource>,
37 instrumentation_lib: InstrumentationLibrary,
38}
39
40impl OpenTelemetryReporter {
41 pub fn new(
42 opentelemetry_exporter: impl SpanExporter + 'static,
43 span_kind: SpanKind,
44 resource: Cow<'static, Resource>,
45 instrumentation_lib: InstrumentationLibrary,
46 ) -> Self {
47 OpenTelemetryReporter {
48 opentelemetry_exporter: Box::new(opentelemetry_exporter),
49 span_kind,
50 resource,
51 instrumentation_lib,
52 }
53 }
54
55 fn convert(&self, spans: &[SpanRecord]) -> Vec<SpanData> {
56 spans
57 .iter()
58 .map(move |span| SpanData {
59 span_context: SpanContext::new(
60 span.trace_id.0.into(),
61 span.span_id.0.into(),
62 TraceFlags::default(),
63 false,
64 TraceState::default(),
65 ),
66 dropped_attributes_count: 0,
67 parent_span_id: span.parent_id.0.into(),
68 name: span.name.clone(),
69 start_time: UNIX_EPOCH + Duration::from_nanos(span.begin_time_unix_ns),
70 end_time: UNIX_EPOCH
71 + Duration::from_nanos(span.begin_time_unix_ns + span.duration_ns),
72 attributes: Self::convert_properties(&span.properties),
73 events: Self::convert_events(&span.events),
74 links: SpanLinks::default(),
75 status: Status::default(),
76 span_kind: self.span_kind.clone(),
77 resource: self.resource.clone(),
78 instrumentation_lib: self.instrumentation_lib.clone(),
79 })
80 .collect()
81 }
82
83 fn convert_properties(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> Vec<KeyValue> {
84 let mut map = Vec::new();
85 for (k, v) in properties {
86 map.push(KeyValue::new(
87 cow_to_otel_key(k.clone()),
88 cow_to_otel_value(v.clone()),
89 ));
90 }
91 map
92 }
93
94 fn convert_events(events: &[EventRecord]) -> SpanEvents {
95 let mut queue = SpanEvents::default();
96 queue.events.extend(events.iter().map(|event| {
97 Event::new(
98 event.name.clone(),
99 UNIX_EPOCH + Duration::from_nanos(event.timestamp_unix_ns),
100 event
101 .properties
102 .iter()
103 .map(|(k, v)| {
104 KeyValue::new(cow_to_otel_key(k.clone()), cow_to_otel_value(v.clone()))
105 })
106 .collect(),
107 0,
108 )
109 }));
110 queue
111 }
112
113 fn try_report(&mut self, spans: &[SpanRecord]) -> Result<(), Box<dyn std::error::Error>> {
114 let opentelemetry_spans = self.convert(spans);
115 futures::executor::block_on(self.opentelemetry_exporter.export(opentelemetry_spans))?;
116 Ok(())
117 }
118}
119
120impl Reporter for OpenTelemetryReporter {
121 fn report(&mut self, spans: &[SpanRecord]) {
122 if spans.is_empty() {
123 return;
124 }
125
126 if let Err(err) = self.try_report(spans) {
127 log::error!("report to opentelemetry failed: {}", err);
128 }
129 }
130}
131
132fn cow_to_otel_key(cow: Cow<'static, str>) -> Key {
133 match cow {
134 Cow::Borrowed(s) => Key::from_static_str(s),
135 Cow::Owned(s) => Key::from(s),
136 }
137}
138
139fn cow_to_otel_value(cow: Cow<'static, str>) -> Value {
140 match cow {
141 Cow::Borrowed(s) => Value::String(StringValue::from(s)),
142 Cow::Owned(s) => Value::String(StringValue::from(s)),
143 }
144}