1#[allow(unused_imports)]
5use std::convert::TryInto;
6use std::fmt::Display;
7use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10
11use futures_core::future::BoxFuture;
12#[cfg(feature = "isahc_collector_client")]
13#[allow(unused_imports)] use isahc::prelude::Configurable;
15
16use opentelemetry::{
17 trace::{Event, Link, SpanKind, Status},
18 InstrumentationLibrary, Key, KeyValue,
19};
20use opentelemetry_sdk::export::{
21 trace::{ExportResult, SpanData, SpanExporter},
22 ExportError,
23};
24use opentelemetry_sdk::trace::SpanEvents;
25
26use crate::exporter::uploader::Uploader;
27
28use self::runtime::JaegerTraceRuntime;
29use self::thrift::jaeger;
30
31mod agent;
32#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))]
33mod collector;
34pub(crate) mod runtime;
35#[allow(clippy::all, unreachable_pub, dead_code)]
36#[rustfmt::skip] mod thrift;
38pub mod config;
39pub(crate) mod transport;
40mod uploader;
41
42const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name";
44
45const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";
47
48#[deprecated(
55 since = "0.21.0",
56 note = "Please migrate to opentelemetry-otlp exporter."
57)]
58#[derive(Debug)]
59pub struct Exporter {
60 export_instrumentation_lib: bool,
62 uploader: Arc<dyn Uploader>,
63 process: jaeger::Process,
64}
65
66impl Exporter {
67 fn new(
68 process: jaeger::Process,
69 export_instrumentation_lib: bool,
70 uploader: Arc<dyn Uploader>,
71 ) -> Exporter {
72 Exporter {
73 export_instrumentation_lib,
74 uploader,
75 process,
76 }
77 }
78}
79
80#[deprecated(
87 since = "0.21.0",
88 note = "Please migrate to opentelemetry-otlp exporter."
89)]
90#[derive(Debug, Default)]
91pub struct Process {
92 pub service_name: String,
94 pub tags: Vec<KeyValue>,
96}
97
98impl SpanExporter for Exporter {
99 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
100 let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
101 let process = self.process.clone();
102
103 for span in batch.into_iter() {
104 jaeger_spans.push(convert_otel_span_into_jaeger_span(
105 span,
106 self.export_instrumentation_lib,
107 ));
108 }
109
110 let uploader = self.uploader.clone();
111 Box::pin(async move {
112 uploader
113 .upload(jaeger::Batch::new(process, jaeger_spans))
114 .await
115 })
116 }
117}
118
119fn links_to_references(links: &[Link]) -> Option<Vec<jaeger::SpanRef>> {
120 if !links.is_empty() {
121 let refs = links
122 .iter()
123 .map(|link| {
124 let span_context = &link.span_context;
125 let trace_id_bytes = span_context.trace_id().to_bytes();
126 let (high, low) = trace_id_bytes.split_at(8);
127 let trace_id_high = i64::from_be_bytes(high.try_into().unwrap());
128 let trace_id_low = i64::from_be_bytes(low.try_into().unwrap());
129
130 jaeger::SpanRef::new(
131 jaeger::SpanRefType::FollowsFrom,
132 trace_id_low,
133 trace_id_high,
134 i64::from_be_bytes(span_context.span_id().to_bytes()),
135 )
136 })
137 .collect();
138 Some(refs)
139 } else {
140 None
141 }
142}
143
144fn convert_otel_span_into_jaeger_span(span: SpanData, export_instrument_lib: bool) -> jaeger::Span {
146 let trace_id_bytes = span.span_context.trace_id().to_bytes();
147 let (high, low) = trace_id_bytes.split_at(8);
148 let trace_id_high = i64::from_be_bytes(high.try_into().unwrap());
149 let trace_id_low = i64::from_be_bytes(low.try_into().unwrap());
150 jaeger::Span {
151 trace_id_low,
152 trace_id_high,
153 span_id: i64::from_be_bytes(span.span_context.span_id().to_bytes()),
154 parent_span_id: i64::from_be_bytes(span.parent_span_id.to_bytes()),
155 operation_name: span.name.into_owned(),
156 references: links_to_references(span.links.as_ref()),
157 flags: span.span_context.trace_flags().to_u8() as i32,
158 start_time: span
159 .start_time
160 .duration_since(SystemTime::UNIX_EPOCH)
161 .unwrap_or_else(|_| Duration::from_secs(0))
162 .as_micros() as i64,
163 duration: span
164 .end_time
165 .duration_since(span.start_time)
166 .unwrap_or_else(|_| Duration::from_secs(0))
167 .as_micros() as i64,
168 tags: Some(build_span_tags(
169 span.attributes,
170 if export_instrument_lib {
171 Some(span.instrumentation_lib)
172 } else {
173 None
174 },
175 span.status,
176 span.span_kind,
177 )),
178 logs: events_to_logs(span.events),
179 }
180}
181
182fn build_span_tags(
183 attrs: Vec<KeyValue>,
184 instrumentation_lib: Option<InstrumentationLibrary>,
185 status: Status,
186 kind: SpanKind,
187) -> Vec<jaeger::Tag> {
188 let mut user_overrides = UserOverrides::default();
189 let mut tags = attrs
191 .into_iter()
192 .map(|kv| {
193 user_overrides.record_attr(kv.key.as_str());
194 kv.into()
195 })
196 .collect::<Vec<_>>();
197
198 if let Some(instrumentation_lib) = instrumentation_lib {
199 tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_NAME, instrumentation_lib.name).into());
201 if let Some(version) = instrumentation_lib.version {
202 tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_VERSION, version).into())
203 }
204 }
205
206 if !user_overrides.span_kind && kind != SpanKind::Internal {
207 tags.push(Key::new(SPAN_KIND).string(format_span_kind(kind)).into());
208 }
209
210 match status {
211 Status::Unset => {}
212 Status::Ok => {
213 if !user_overrides.status_code {
214 tags.push(KeyValue::new(OTEL_STATUS_CODE, "OK").into());
215 }
216 }
217 Status::Error {
218 description: message,
219 } => {
220 if !user_overrides.error {
221 tags.push(Key::new(ERROR).bool(true).into());
222 }
223
224 if !user_overrides.status_code {
225 tags.push(KeyValue::new(OTEL_STATUS_CODE, "ERROR").into());
226 }
227
228 if !message.is_empty() && !user_overrides.status_description {
229 tags.push(Key::new(OTEL_STATUS_DESCRIPTION).string(message).into());
230 }
231 }
232 }
233
234 tags
235}
236
237fn format_span_kind(kind: SpanKind) -> &'static str {
238 match kind {
239 SpanKind::Client => "client",
240 SpanKind::Server => "server",
241 SpanKind::Producer => "producer",
242 SpanKind::Consumer => "consumer",
243 SpanKind::Internal => "internal",
244 }
245}
246
247const ERROR: &str = "error";
248const SPAN_KIND: &str = "span.kind";
249const OTEL_STATUS_CODE: &str = "otel.status_code";
250const OTEL_STATUS_DESCRIPTION: &str = "otel.status_description";
251
252#[derive(Default)]
253struct UserOverrides {
254 error: bool,
255 span_kind: bool,
256 status_code: bool,
257 status_description: bool,
258}
259
260impl UserOverrides {
261 fn record_attr(&mut self, attr: &str) {
262 match attr {
263 ERROR => self.error = true,
264 SPAN_KIND => self.span_kind = true,
265 OTEL_STATUS_CODE => self.status_code = true,
266 OTEL_STATUS_DESCRIPTION => self.status_description = true,
267 _ => (),
268 }
269 }
270}
271
272fn events_to_logs(events: SpanEvents) -> Option<Vec<jaeger::Log>> {
273 if events.is_empty() {
274 None
275 } else {
276 Some(events.into_iter().map(Into::into).collect())
277 }
278}
279
280#[derive(Debug)]
282pub enum Error {
283 ThriftAgentError(::thrift::Error),
288
289 ConfigError {
291 pipeline_name: &'static str,
293 config_name: &'static str,
295 reason: String,
297 },
298}
299
300impl std::error::Error for Error {}
301
302impl From<::thrift::Error> for Error {
303 fn from(value: ::thrift::Error) -> Self {
304 Error::ThriftAgentError(value)
305 }
306}
307
308impl Display for Error {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 match self {
311 Error::ThriftAgentError(err) => match err {
312 ::thrift::Error::Transport(transport_error) => {
313 write!(
314 f,
315 "thrift agent failed on transportation layer, {}, {}",
316 transport_error, transport_error.message
317 )
318 }
319 ::thrift::Error::Protocol(protocol_error) => {
320 write!(
321 f,
322 "thrift agent failed on protocol layer, {}, {}",
323 protocol_error, protocol_error.message
324 )
325 }
326 ::thrift::Error::Application(application_error) => {
327 write!(
328 f,
329 "thrift agent failed on application layer, {}, {}",
330 application_error, application_error.message
331 )
332 }
333 ::thrift::Error::User(error) => {
334 write!(f, "thrift agent failed, {}", error)
335 }
336 },
337 Error::ConfigError {
338 pipeline_name,
339 config_name,
340 reason,
341 } => write!(
342 f,
343 "{} pipeline fails because one of the configuration {} is invalid. {}",
344 pipeline_name, config_name, reason
345 ),
346 }
347 }
348}
349
350impl ExportError for Error {
351 fn exporter_name(&self) -> &'static str {
352 "jaeger"
353 }
354}
355
356fn address_family(addrs: &[SocketAddr]) -> SocketAddr {
360 match addrs.first() {
361 Some(SocketAddr::V4(_)) | None => SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)),
362 Some(SocketAddr::V6(_)) => SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)),
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use opentelemetry::{
369 trace::{SpanKind, Status},
370 KeyValue,
371 };
372
373 use crate::exporter::thrift::jaeger::Tag;
374 use crate::exporter::{build_span_tags, OTEL_STATUS_CODE, OTEL_STATUS_DESCRIPTION};
375
376 use super::SPAN_KIND;
377
378 fn assert_tag_contains(tags: Vec<Tag>, key: &'static str, expect_val: &'static str) {
379 assert_eq!(
380 tags.into_iter()
381 .filter(|tag| tag.key.as_str() == key
382 && tag.v_str.as_deref().unwrap_or("") == expect_val)
383 .count(),
384 1,
385 "Expect a tag {} with value {} but found nothing",
386 key,
387 expect_val
388 );
389 }
390
391 fn assert_tag_not_contains(tags: Vec<Tag>, key: &'static str) {
392 assert_eq!(
393 tags.into_iter()
394 .filter(|tag| tag.key.as_str() == key)
395 .count(),
396 0,
397 "Not expect tag {}, but found something",
398 key
399 );
400 }
401
402 #[rustfmt::skip]
403 fn get_error_tag_test_data() -> Vec<(Status, Option<&'static str>, Option<&'static str>)>
404 {
405 vec![
407 (Status::error(""), Some("ERROR"), None),
408 (Status::Unset, None, None),
409 (Status::Ok, Some("OK"), None),
411 (Status::error("have message"), Some("ERROR"), Some("have message")),
412 (Status::Unset, None, None),
413 ]
414 }
415
416 #[test]
417 fn test_set_status() {
418 for (status, status_tag_val, msg_tag_val) in get_error_tag_test_data() {
419 let tags = build_span_tags(Vec::new(), None, status, SpanKind::Client);
420 if let Some(val) = status_tag_val {
421 assert_tag_contains(tags.clone(), OTEL_STATUS_CODE, val);
422 } else {
423 assert_tag_not_contains(tags.clone(), OTEL_STATUS_CODE);
424 }
425
426 if let Some(val) = msg_tag_val {
427 assert_tag_contains(tags.clone(), OTEL_STATUS_DESCRIPTION, val);
428 } else {
429 assert_tag_not_contains(tags.clone(), OTEL_STATUS_DESCRIPTION);
430 }
431 }
432 }
433
434 #[test]
435 fn ignores_user_set_values() {
436 let mut attributes = Vec::new();
437 let user_error = true;
438 let user_kind = "server";
439 let user_status_description = "Something bad happened";
440 let user_status = Status::Error {
441 description: user_status_description.into(),
442 };
443 attributes.push(KeyValue::new("error", user_error));
444 attributes.push(KeyValue::new(SPAN_KIND, user_kind));
445 attributes.push(KeyValue::new(OTEL_STATUS_CODE, "ERROR"));
446 attributes.push(KeyValue::new(
447 OTEL_STATUS_DESCRIPTION,
448 user_status_description,
449 ));
450 let tags = build_span_tags(attributes, None, user_status, SpanKind::Client);
451
452 assert!(tags
453 .iter()
454 .filter(|tag| tag.key.as_str() == "error")
455 .all(|tag| tag.v_bool.unwrap()));
456 assert_tag_contains(tags.clone(), SPAN_KIND, user_kind);
457 assert_tag_contains(tags.clone(), OTEL_STATUS_CODE, "ERROR");
458 assert_tag_contains(tags, OTEL_STATUS_DESCRIPTION, user_status_description);
459 }
460
461 #[test]
462 fn error_message_should_contain_details() {
463 let size_limit_err =
464 crate::Error::from(::thrift::Error::Protocol(thrift::ProtocolError::new(
465 thrift::ProtocolErrorKind::SizeLimit,
466 "the error message should contain details".to_string(),
467 )));
468 assert_eq!(
469 format!("{}", size_limit_err),
470 "thrift agent failed on protocol layer, message too long, the error message should contain details"
471 );
472 }
473}