opentelemetry_jaeger/exporter/
mod.rs

1//! # Jaeger Exporter
2//!
3// Linting isn't detecting that it's used seems like linting bug.
4#[allow(unused_imports)]
5use std::convert::TryInto;
6use std::fmt::Display;
7use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10
11use futures_core::future::BoxFuture;
12#[cfg(feature = "isahc_collector_client")]
13#[allow(unused_imports)] // this is actually used to configure authentication
14use isahc::prelude::Configurable;
15
16use opentelemetry::{
17    trace::{Event, Link, SpanKind, Status},
18    InstrumentationLibrary, Key, KeyValue,
19};
20use opentelemetry_sdk::export::{
21    trace::{ExportResult, SpanData, SpanExporter},
22    ExportError,
23};
24use opentelemetry_sdk::trace::SpanEvents;
25
26use crate::exporter::uploader::Uploader;
27
28use self::runtime::JaegerTraceRuntime;
29use self::thrift::jaeger;
30
31mod agent;
32#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))]
33mod collector;
34pub(crate) mod runtime;
35#[allow(clippy::all, unreachable_pub, dead_code)]
36#[rustfmt::skip] // don't format generated files
37mod thrift;
38pub mod config;
39pub(crate) mod transport;
40mod uploader;
41
42/// Instrument Library name MUST be reported in Jaeger Span tags with the following key
43const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name";
44
45/// Instrument Library version MUST be reported in Jaeger Span tags with the following key
46const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";
47
48/// Jaeger span exporter
49///
50/// Deprecation Notice:
51/// Ingestion of OTLP is now supported in Jaeger please check [crates.io] for more details.
52///
53/// [crates.io]: https://crates.io/crates/opentelemetry-jaeger
54#[deprecated(
55    since = "0.21.0",
56    note = "Please migrate to opentelemetry-otlp exporter."
57)]
58#[derive(Debug)]
59pub struct Exporter {
60    /// Whether or not to export instrumentation information.
61    export_instrumentation_lib: bool,
62    uploader: Arc<dyn Uploader>,
63    process: jaeger::Process,
64}
65
66impl Exporter {
67    fn new(
68        process: jaeger::Process,
69        export_instrumentation_lib: bool,
70        uploader: Arc<dyn Uploader>,
71    ) -> Exporter {
72        Exporter {
73            export_instrumentation_lib,
74            uploader,
75            process,
76        }
77    }
78}
79
80/// Jaeger process configuration
81///
82/// Deprecation Notice:
83/// Ingestion of OTLP is now supported in Jaeger please check [crates.io] for more details.
84///
85/// [crates.io]: https://crates.io/crates/opentelemetry-jaeger
86#[deprecated(
87    since = "0.21.0",
88    note = "Please migrate to opentelemetry-otlp exporter."
89)]
90#[derive(Debug, Default)]
91pub struct Process {
92    /// Jaeger service name
93    pub service_name: String,
94    /// Jaeger tags
95    pub tags: Vec<KeyValue>,
96}
97
98impl SpanExporter for Exporter {
99    fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
100        let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
101        let process = self.process.clone();
102
103        for span in batch.into_iter() {
104            jaeger_spans.push(convert_otel_span_into_jaeger_span(
105                span,
106                self.export_instrumentation_lib,
107            ));
108        }
109
110        let uploader = self.uploader.clone();
111        Box::pin(async move {
112            uploader
113                .upload(jaeger::Batch::new(process, jaeger_spans))
114                .await
115        })
116    }
117}
118
119fn links_to_references(links: &[Link]) -> Option<Vec<jaeger::SpanRef>> {
120    if !links.is_empty() {
121        let refs = links
122            .iter()
123            .map(|link| {
124                let span_context = &link.span_context;
125                let trace_id_bytes = span_context.trace_id().to_bytes();
126                let (high, low) = trace_id_bytes.split_at(8);
127                let trace_id_high = i64::from_be_bytes(high.try_into().unwrap());
128                let trace_id_low = i64::from_be_bytes(low.try_into().unwrap());
129
130                jaeger::SpanRef::new(
131                    jaeger::SpanRefType::FollowsFrom,
132                    trace_id_low,
133                    trace_id_high,
134                    i64::from_be_bytes(span_context.span_id().to_bytes()),
135                )
136            })
137            .collect();
138        Some(refs)
139    } else {
140        None
141    }
142}
143
144/// Convert spans to jaeger thrift span for exporting.
145fn convert_otel_span_into_jaeger_span(span: SpanData, export_instrument_lib: bool) -> jaeger::Span {
146    let trace_id_bytes = span.span_context.trace_id().to_bytes();
147    let (high, low) = trace_id_bytes.split_at(8);
148    let trace_id_high = i64::from_be_bytes(high.try_into().unwrap());
149    let trace_id_low = i64::from_be_bytes(low.try_into().unwrap());
150    jaeger::Span {
151        trace_id_low,
152        trace_id_high,
153        span_id: i64::from_be_bytes(span.span_context.span_id().to_bytes()),
154        parent_span_id: i64::from_be_bytes(span.parent_span_id.to_bytes()),
155        operation_name: span.name.into_owned(),
156        references: links_to_references(span.links.as_ref()),
157        flags: span.span_context.trace_flags().to_u8() as i32,
158        start_time: span
159            .start_time
160            .duration_since(SystemTime::UNIX_EPOCH)
161            .unwrap_or_else(|_| Duration::from_secs(0))
162            .as_micros() as i64,
163        duration: span
164            .end_time
165            .duration_since(span.start_time)
166            .unwrap_or_else(|_| Duration::from_secs(0))
167            .as_micros() as i64,
168        tags: Some(build_span_tags(
169            span.attributes,
170            if export_instrument_lib {
171                Some(span.instrumentation_lib)
172            } else {
173                None
174            },
175            span.status,
176            span.span_kind,
177        )),
178        logs: events_to_logs(span.events),
179    }
180}
181
182fn build_span_tags(
183    attrs: Vec<KeyValue>,
184    instrumentation_lib: Option<InstrumentationLibrary>,
185    status: Status,
186    kind: SpanKind,
187) -> Vec<jaeger::Tag> {
188    let mut user_overrides = UserOverrides::default();
189    // TODO determine if namespacing is required to avoid collisions with set attributes
190    let mut tags = attrs
191        .into_iter()
192        .map(|kv| {
193            user_overrides.record_attr(kv.key.as_str());
194            kv.into()
195        })
196        .collect::<Vec<_>>();
197
198    if let Some(instrumentation_lib) = instrumentation_lib {
199        // Set instrument library tags
200        tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_NAME, instrumentation_lib.name).into());
201        if let Some(version) = instrumentation_lib.version {
202            tags.push(KeyValue::new(INSTRUMENTATION_LIBRARY_VERSION, version).into())
203        }
204    }
205
206    if !user_overrides.span_kind && kind != SpanKind::Internal {
207        tags.push(Key::new(SPAN_KIND).string(format_span_kind(kind)).into());
208    }
209
210    match status {
211        Status::Unset => {}
212        Status::Ok => {
213            if !user_overrides.status_code {
214                tags.push(KeyValue::new(OTEL_STATUS_CODE, "OK").into());
215            }
216        }
217        Status::Error {
218            description: message,
219        } => {
220            if !user_overrides.error {
221                tags.push(Key::new(ERROR).bool(true).into());
222            }
223
224            if !user_overrides.status_code {
225                tags.push(KeyValue::new(OTEL_STATUS_CODE, "ERROR").into());
226            }
227
228            if !message.is_empty() && !user_overrides.status_description {
229                tags.push(Key::new(OTEL_STATUS_DESCRIPTION).string(message).into());
230            }
231        }
232    }
233
234    tags
235}
236
237fn format_span_kind(kind: SpanKind) -> &'static str {
238    match kind {
239        SpanKind::Client => "client",
240        SpanKind::Server => "server",
241        SpanKind::Producer => "producer",
242        SpanKind::Consumer => "consumer",
243        SpanKind::Internal => "internal",
244    }
245}
246
247const ERROR: &str = "error";
248const SPAN_KIND: &str = "span.kind";
249const OTEL_STATUS_CODE: &str = "otel.status_code";
250const OTEL_STATUS_DESCRIPTION: &str = "otel.status_description";
251
252#[derive(Default)]
253struct UserOverrides {
254    error: bool,
255    span_kind: bool,
256    status_code: bool,
257    status_description: bool,
258}
259
260impl UserOverrides {
261    fn record_attr(&mut self, attr: &str) {
262        match attr {
263            ERROR => self.error = true,
264            SPAN_KIND => self.span_kind = true,
265            OTEL_STATUS_CODE => self.status_code = true,
266            OTEL_STATUS_DESCRIPTION => self.status_description = true,
267            _ => (),
268        }
269    }
270}
271
272fn events_to_logs(events: SpanEvents) -> Option<Vec<jaeger::Log>> {
273    if events.is_empty() {
274        None
275    } else {
276        Some(events.into_iter().map(Into::into).collect())
277    }
278}
279
280/// Wrap type for errors from opentelemetry jaeger
281#[derive(Debug)]
282pub enum Error {
283    /// Error from thrift agents.
284    ///
285    /// If the spans was sent to jaeger agent. Refer [AgentPipeline](config::agent::AgentPipeline) for more details.
286    /// If the spans was sent to jaeger collector. Refer [CollectorPipeline](config::collector::CollectorPipeline) for more details.
287    ThriftAgentError(::thrift::Error),
288
289    /// Pipeline fails because one of the configurations is invalid.
290    ConfigError {
291        /// the name of the pipeline. It can be `agent`, `collector` or `wasm collector`
292        pipeline_name: &'static str,
293        /// config name that has the error.
294        config_name: &'static str,
295        /// the underlying error message.
296        reason: String,
297    },
298}
299
300impl std::error::Error for Error {}
301
302impl From<::thrift::Error> for Error {
303    fn from(value: ::thrift::Error) -> Self {
304        Error::ThriftAgentError(value)
305    }
306}
307
308impl Display for Error {
309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        match self {
311            Error::ThriftAgentError(err) => match err {
312                ::thrift::Error::Transport(transport_error) => {
313                    write!(
314                        f,
315                        "thrift agent failed on transportation layer, {}, {}",
316                        transport_error, transport_error.message
317                    )
318                }
319                ::thrift::Error::Protocol(protocol_error) => {
320                    write!(
321                        f,
322                        "thrift agent failed on protocol layer, {}, {}",
323                        protocol_error, protocol_error.message
324                    )
325                }
326                ::thrift::Error::Application(application_error) => {
327                    write!(
328                        f,
329                        "thrift agent failed on application layer, {}, {}",
330                        application_error, application_error.message
331                    )
332                }
333                ::thrift::Error::User(error) => {
334                    write!(f, "thrift agent failed, {}", error)
335                }
336            },
337            Error::ConfigError {
338                pipeline_name,
339                config_name,
340                reason,
341            } => write!(
342                f,
343                "{} pipeline fails because one of the configuration {} is invalid. {}",
344                pipeline_name, config_name, reason
345            ),
346        }
347    }
348}
349
350impl ExportError for Error {
351    fn exporter_name(&self) -> &'static str {
352        "jaeger"
353    }
354}
355
356/// Sample the first address provided to designate which IP family to bind the socket to.
357/// IP families returned be INADDR_ANY as [`Ipv4Addr::UNSPECIFIED`] or
358/// IN6ADDR_ANY as [`Ipv6Addr::UNSPECIFIED`].
359fn address_family(addrs: &[SocketAddr]) -> SocketAddr {
360    match addrs.first() {
361        Some(SocketAddr::V4(_)) | None => SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)),
362        Some(SocketAddr::V6(_)) => SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)),
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use opentelemetry::{
369        trace::{SpanKind, Status},
370        KeyValue,
371    };
372
373    use crate::exporter::thrift::jaeger::Tag;
374    use crate::exporter::{build_span_tags, OTEL_STATUS_CODE, OTEL_STATUS_DESCRIPTION};
375
376    use super::SPAN_KIND;
377
378    fn assert_tag_contains(tags: Vec<Tag>, key: &'static str, expect_val: &'static str) {
379        assert_eq!(
380            tags.into_iter()
381                .filter(|tag| tag.key.as_str() == key
382                    && tag.v_str.as_deref().unwrap_or("") == expect_val)
383                .count(),
384            1,
385            "Expect a tag {} with value {} but found nothing",
386            key,
387            expect_val
388        );
389    }
390
391    fn assert_tag_not_contains(tags: Vec<Tag>, key: &'static str) {
392        assert_eq!(
393            tags.into_iter()
394                .filter(|tag| tag.key.as_str() == key)
395                .count(),
396            0,
397            "Not expect tag {}, but found something",
398            key
399        );
400    }
401
402    #[rustfmt::skip]
403    fn get_error_tag_test_data() -> Vec<(Status, Option<&'static str>, Option<&'static str>)>
404    {
405        // Status, OTEL_STATUS_CODE tag value, OTEL_STATUS_DESCRIPTION tag value
406        vec![
407            (Status::error(""), Some("ERROR"), None),
408            (Status::Unset, None, None),
409            // When status is ok, no description should be in span data. This should be ensured by Otel API
410            (Status::Ok, Some("OK"), None),
411            (Status::error("have message"), Some("ERROR"), Some("have message")),
412            (Status::Unset, None, None),
413        ]
414    }
415
416    #[test]
417    fn test_set_status() {
418        for (status, status_tag_val, msg_tag_val) in get_error_tag_test_data() {
419            let tags = build_span_tags(Vec::new(), None, status, SpanKind::Client);
420            if let Some(val) = status_tag_val {
421                assert_tag_contains(tags.clone(), OTEL_STATUS_CODE, val);
422            } else {
423                assert_tag_not_contains(tags.clone(), OTEL_STATUS_CODE);
424            }
425
426            if let Some(val) = msg_tag_val {
427                assert_tag_contains(tags.clone(), OTEL_STATUS_DESCRIPTION, val);
428            } else {
429                assert_tag_not_contains(tags.clone(), OTEL_STATUS_DESCRIPTION);
430            }
431        }
432    }
433
434    #[test]
435    fn ignores_user_set_values() {
436        let mut attributes = Vec::new();
437        let user_error = true;
438        let user_kind = "server";
439        let user_status_description = "Something bad happened";
440        let user_status = Status::Error {
441            description: user_status_description.into(),
442        };
443        attributes.push(KeyValue::new("error", user_error));
444        attributes.push(KeyValue::new(SPAN_KIND, user_kind));
445        attributes.push(KeyValue::new(OTEL_STATUS_CODE, "ERROR"));
446        attributes.push(KeyValue::new(
447            OTEL_STATUS_DESCRIPTION,
448            user_status_description,
449        ));
450        let tags = build_span_tags(attributes, None, user_status, SpanKind::Client);
451
452        assert!(tags
453            .iter()
454            .filter(|tag| tag.key.as_str() == "error")
455            .all(|tag| tag.v_bool.unwrap()));
456        assert_tag_contains(tags.clone(), SPAN_KIND, user_kind);
457        assert_tag_contains(tags.clone(), OTEL_STATUS_CODE, "ERROR");
458        assert_tag_contains(tags, OTEL_STATUS_DESCRIPTION, user_status_description);
459    }
460
461    #[test]
462    fn error_message_should_contain_details() {
463        let size_limit_err =
464            crate::Error::from(::thrift::Error::Protocol(thrift::ProtocolError::new(
465                thrift::ProtocolErrorKind::SizeLimit,
466                "the error message should contain details".to_string(),
467            )));
468        assert_eq!(
469            format!("{}", size_limit_err),
470            "thrift agent failed on protocol layer, message too long, the error message should contain details"
471        );
472    }
473}