foundations/telemetry/tracing/
output_jaeger_thrift_udp.rs

1use super::internal::reporter_error;
2use crate::telemetry::settings::JaegerThriftUdpOutputSettings;
3use crate::{BootstrapResult, ServiceInfo};
4use anyhow::bail;
5use cf_rustracing::tag::Tag;
6use cf_rustracing_jaeger::reporter::JaegerCompactReporter;
7use cf_rustracing_jaeger::span::SpanReceiver;
8use futures_util::future::{BoxFuture, FutureExt as _};
9use std::net::SocketAddr;
10use std::net::{Ipv4Addr, Ipv6Addr};
11
12pub(super) fn start(
13    service_info: ServiceInfo,
14    settings: &JaegerThriftUdpOutputSettings,
15    span_rx: SpanReceiver,
16) -> BootstrapResult<BoxFuture<'static, BootstrapResult<()>>> {
17    let server_addr = settings.server_addr.into();
18    let reporter_bind_addr = get_reporter_bind_addr(settings)?;
19
20    // NOTE: do socket binding as early as possible. It's a good practice to disable binding
21    // with seccomp after the service initialisaion.
22    let socket = std::net::UdpSocket::bind(reporter_bind_addr)?;
23
24    socket.set_nonblocking(true)?;
25
26    Ok(async move {
27        let mut reporter = JaegerCompactReporter::new_with_transport(
28            service_info.name,
29            server_addr,
30            tokio::net::UdpSocket::from_std(socket)?,
31        )?;
32
33        reporter.add_service_tag(Tag::new("app.version", service_info.version));
34
35        do_export(reporter, span_rx).await;
36
37        Ok(())
38    }
39    .boxed())
40}
41
42fn get_reporter_bind_addr(settings: &JaegerThriftUdpOutputSettings) -> BootstrapResult<SocketAddr> {
43    Ok(match settings.reporter_bind_addr {
44        Some(addr) => {
45            // the reporter socket will attempt to send traffic to the
46            // agent address, so they have to use the same address family
47            if settings.server_addr.is_ipv6() == addr.is_ipv6() {
48                addr.into()
49            } else {
50                bail!("`jaeger_tracing_server_addr` and `jaeger_reporter_bind_addr` must have the same address family");
51            }
52        }
53        // caused by https://github.com/sile/rustracing_jaeger/blob/bc7d03f2f6ac6bc0269542089c8907279706ecb7/src/reporter.rs#L34,
54        // we need to also set the reporter to an ipv6 when agent is ipv6
55        None if settings.server_addr.is_ipv6() => (Ipv6Addr::LOCALHOST, 0).into(),
56        None => (Ipv4Addr::LOCALHOST, 0).into(),
57    })
58}
59
60async fn do_export(reporter: JaegerCompactReporter, mut span_rx: SpanReceiver) {
61    while let Some(span) = span_rx.recv().await {
62        // NOTE: we are limited with a UDP dgram size here, so doing batching is risky.
63        let spans = [span];
64        if let Err(err) = reporter.report(&spans).await {
65            #[cfg(feature = "logging")]
66            if self::logging::is_msgsize_error(&err) {
67                self::logging::log_span_too_large_err(&err, &spans[0]);
68                continue;
69            }
70
71            reporter_error(err);
72        }
73    }
74}
75
76#[cfg(feature = "logging")]
77mod logging {
78    use cf_rustracing::tag::Tag;
79    use cf_rustracing_jaeger::span::FinishedSpan;
80    use std::io;
81
82    pub(super) fn is_msgsize_error(err: &cf_rustracing::Error) -> bool {
83        err.concrete_cause::<io::Error>()
84            .and_then(io::Error::raw_os_error)
85            == Some(libc::EMSGSIZE)
86    }
87
88    pub(super) fn log_span_too_large_err(err: &cf_rustracing::Error, span: &FinishedSpan) {
89        let tag_count = span.tags().len();
90        let tag_total: usize = span.tags().iter().map(tag_size).sum();
91        let top_tag = span.tags().iter().max_by_key(|t| tag_size(t));
92
93        let log_count = span.logs().len();
94        let log_total: usize = span.logs().iter().map(log_size).sum();
95
96        let top_tag = top_tag
97            .map(|t| format!(", top: {} @ approx {}", t.name(), tag_size(t)))
98            .unwrap_or_default();
99
100        crate::telemetry::log::error!(
101            "trace span exceeded thrift UDP message size limits";
102            "error" => %err,
103            "operation" => span.operation_name(),
104            "tags" => format!("count: {tag_count}, size: approx {tag_total}{top_tag}"),
105            "logs" => format!("count: {log_count}, size: approx {log_total}"),
106        );
107    }
108
109    /// Approximates the wire size of a span `Tag`. This is not exact and
110    /// more closely resembles the non-compact thrift encoding, but should be
111    /// sufficient to determine what causes the span to trigger EMSGSIZE.
112    fn tag_size(tag: &Tag) -> usize {
113        use cf_rustracing::tag::TagValue;
114        let val_size = match tag.value() {
115            TagValue::String(s) => s.len(),
116            TagValue::Boolean(_) => 1,
117            TagValue::Integer(_) => 8,
118            TagValue::Float(_) => 8,
119        };
120        tag.name().len() + val_size
121    }
122
123    /// Approximates the wire size of a span `Log`, which is always stringified.
124    fn log_size(log: &cf_rustracing::log::Log) -> usize {
125        log.fields()
126            .iter()
127            .map(|f| f.name().len() + f.value().len())
128            .sum()
129    }
130}