1#![doc = include_str!("../README.md")]
20
21use std::borrow::Cow;
22use std::collections::HashSet;
23use std::fmt::Debug;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::LazyLock;
27use std::time::Duration;
28use std::time::SystemTime;
29
30use fastrace::collector::EventRecord;
31use fastrace::collector::Reporter;
32use fastrace::prelude::*;
33use opentelemetry::InstrumentationScope;
34use opentelemetry::KeyValue;
35use opentelemetry::trace::Event;
36use opentelemetry::trace::SpanContext as OtelSpanContext;
37use opentelemetry::trace::SpanKind;
38use opentelemetry::trace::Status;
39use opentelemetry::trace::TraceFlags;
40use opentelemetry::trace::TraceState;
41use opentelemetry_sdk::Resource;
42use opentelemetry_sdk::error::OTelSdkResult;
43use opentelemetry_sdk::trace::SpanData;
44use opentelemetry_sdk::trace::SpanEvents;
45use opentelemetry_sdk::trace::SpanExporter;
46use opentelemetry_sdk::trace::SpanLinks;
47
48pub struct OpenTelemetryReporter {
74 exporter: Box<dyn DynSpanExporter>,
75 instrumentation_scope: InstrumentationScope,
76}
77
78pub fn current_opentelemetry_context() -> Option<OtelSpanContext> {
111 let span_context = fastrace::collector::SpanContext::current_local_parent()?;
112
113 let trace_flags = if span_context.sampled {
114 TraceFlags::SAMPLED
115 } else {
116 TraceFlags::default()
117 };
118
119 Some(OtelSpanContext::new(
120 span_context.trace_id.0.into(),
121 span_context.span_id.0.into(),
122 trace_flags,
123 false,
124 TraceState::default(),
125 ))
126}
127
128pub const SPAN_KIND: &str = "span.kind";
129pub const SPAN_STATUS_CODE: &str = "span.status_code";
130pub const SPAN_STATUS_DESCRIPTION: &str = "span.status_description";
131pub const SPAN_PARENT_SPAN_IS_REMOTE: &str = "span.parent_span_is_remote";
132
133static OTEL_PROPERTIES: LazyLock<HashSet<&str>> = LazyLock::new(|| {
134 HashSet::from([
135 SPAN_KIND,
136 SPAN_STATUS_CODE,
137 SPAN_STATUS_DESCRIPTION,
138 SPAN_PARENT_SPAN_IS_REMOTE,
139 ])
140});
141
142fn map_props_to_kvs(props: Vec<(Cow<'static, str>, Cow<'static, str>)>) -> Vec<KeyValue> {
144 props
145 .into_iter()
146 .filter(|(k, _)| !OTEL_PROPERTIES.contains(k.as_ref()))
147 .map(|(k, v)| KeyValue::new(k, v))
148 .collect()
149}
150
151fn map_events(events: Vec<EventRecord>) -> SpanEvents {
153 let mut queue = SpanEvents::default();
154 queue.events.reserve(events.len());
155
156 for EventRecord {
157 name,
158 timestamp_unix_ns,
159 properties,
160 } in events
161 {
162 let time = SystemTime::UNIX_EPOCH + Duration::from_nanos(timestamp_unix_ns);
163 let attributes = map_props_to_kvs(properties);
164 queue.events.push(Event::new(name, time, attributes, 0));
165 }
166
167 queue
168}
169
170trait DynSpanExporter: Send + Sync + Debug {
171 fn export(
172 &self,
173 batch: Vec<SpanData>,
174 ) -> Pin<Box<dyn Future<Output = OTelSdkResult> + Send + '_>>;
175}
176
177impl<T: SpanExporter> DynSpanExporter for T {
178 fn export(
179 &self,
180 batch: Vec<SpanData>,
181 ) -> Pin<Box<dyn Future<Output = OTelSdkResult> + Send + '_>> {
182 Box::pin(SpanExporter::export(self, batch))
183 }
184}
185
186impl OpenTelemetryReporter {
187 pub fn new(
188 mut exporter: impl SpanExporter + 'static,
189 resource: Cow<'static, Resource>,
190 instrumentation_scope: InstrumentationScope,
191 ) -> Self {
192 exporter.set_resource(&resource);
193 OpenTelemetryReporter {
194 exporter: Box::new(exporter),
195 instrumentation_scope,
196 }
197 }
198
199 fn convert(&self, spans: Vec<SpanRecord>) -> Vec<SpanData> {
200 spans
201 .into_iter()
202 .map(
203 |SpanRecord {
204 trace_id,
205 span_id,
206 parent_id,
207 begin_time_unix_ns,
208 duration_ns,
209 name,
210 properties,
211 events,
212 }| {
213 let parent_span_id = parent_id.0.into();
214 let span_kind = span_kind(&properties);
215 let status = span_status(&properties);
216 let parent_span_is_remote = parent_span_is_remote(&properties);
217 let instrumentation_scope = self.instrumentation_scope.clone();
218 let start_time =
219 SystemTime::UNIX_EPOCH + Duration::from_nanos(begin_time_unix_ns);
220 let end_time = SystemTime::UNIX_EPOCH
221 + Duration::from_nanos(begin_time_unix_ns + duration_ns);
222 let attributes = map_props_to_kvs(properties);
223 let events = map_events(events);
224
225 SpanData {
226 span_context: OtelSpanContext::new(
227 trace_id.0.into(),
228 span_id.0.into(),
229 TraceFlags::default(),
230 parent_span_is_remote,
231 TraceState::default(),
232 ),
233 parent_span_id,
234 parent_span_is_remote,
235 span_kind,
236 name,
237 start_time,
238 end_time,
239 attributes,
240 dropped_attributes_count: 0,
241 events,
242 links: SpanLinks::default(),
243 status,
244 instrumentation_scope,
245 }
246 },
247 )
248 .collect()
249 }
250
251 fn try_report(&mut self, spans: Vec<SpanRecord>) -> Result<(), Box<dyn std::error::Error>> {
252 let spans = self.convert(spans);
253 pollster::block_on(self.exporter.export(spans))?;
254 Ok(())
255 }
256}
257
258impl Reporter for OpenTelemetryReporter {
259 fn report(&mut self, spans: Vec<SpanRecord>) {
260 if spans.is_empty() {
261 return;
262 }
263
264 if let Err(err) = self.try_report(spans) {
265 log::error!("failed to report to opentelemetry: {err}");
266 }
267 }
268}
269
270fn span_kind(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> SpanKind {
271 properties
272 .iter()
273 .find(|(k, _)| k == SPAN_KIND)
274 .and_then(|(_, v)| match v.to_lowercase().as_str() {
275 "client" => Some(SpanKind::Client),
276 "server" => Some(SpanKind::Server),
277 "producer" => Some(SpanKind::Producer),
278 "consumer" => Some(SpanKind::Consumer),
279 "internal" => Some(SpanKind::Internal),
280 _ => None,
281 })
282 .unwrap_or(SpanKind::Internal)
283}
284
285fn span_status(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> Status {
286 let status_description = properties
287 .iter()
288 .find(|(k, _)| k == SPAN_STATUS_DESCRIPTION)
289 .map(|(_, v)| v.to_string())
290 .unwrap_or_default();
291 properties
292 .iter()
293 .find(|(k, _)| k == SPAN_STATUS_CODE)
294 .and_then(|(_, v)| match v.to_lowercase().as_str() {
295 "unset" => Some(Status::Unset),
296 "ok" => Some(Status::Ok),
297 "error" => Some(Status::Error {
298 description: status_description.into(),
299 }),
300 _ => None,
301 })
302 .unwrap_or(Status::Unset)
303}
304
305fn parent_span_is_remote(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> bool {
306 properties
307 .iter()
308 .find(|(k, _)| k == SPAN_PARENT_SPAN_IS_REMOTE)
309 .map(|(_, v)| v.to_lowercase().as_str() == "true")
310 .unwrap_or(false)
311}