foundations/telemetry/tracing/
output_jaeger_thrift_udp.rs1use super::internal::reporter_error;
2use crate::telemetry::settings::JaegerThriftUdpOutputSettings;
3use crate::{BootstrapResult, ServiceInfo};
4use anyhow::bail;
5use cf_rustracing::tag::Tag;
6use cf_rustracing_jaeger::reporter::JaegerCompactReporter;
7use cf_rustracing_jaeger::span::SpanReceiver;
8use futures_util::future::{BoxFuture, FutureExt as _};
9use std::net::SocketAddr;
10use std::net::{Ipv4Addr, Ipv6Addr};
11
12pub(super) fn start(
13 service_info: ServiceInfo,
14 settings: &JaegerThriftUdpOutputSettings,
15 span_rx: SpanReceiver,
16) -> BootstrapResult<BoxFuture<'static, BootstrapResult<()>>> {
17 let server_addr = settings.server_addr.into();
18 let reporter_bind_addr = get_reporter_bind_addr(settings)?;
19
20 let socket = std::net::UdpSocket::bind(reporter_bind_addr)?;
23
24 socket.set_nonblocking(true)?;
25
26 Ok(async move {
27 let mut reporter = JaegerCompactReporter::new_with_transport(
28 service_info.name,
29 server_addr,
30 tokio::net::UdpSocket::from_std(socket)?,
31 )?;
32
33 reporter.add_service_tag(Tag::new("app.version", service_info.version));
34
35 do_export(reporter, span_rx).await;
36
37 Ok(())
38 }
39 .boxed())
40}
41
42fn get_reporter_bind_addr(settings: &JaegerThriftUdpOutputSettings) -> BootstrapResult<SocketAddr> {
43 Ok(match settings.reporter_bind_addr {
44 Some(addr) => {
45 if settings.server_addr.is_ipv6() == addr.is_ipv6() {
48 addr.into()
49 } else {
50 bail!("`jaeger_tracing_server_addr` and `jaeger_reporter_bind_addr` must have the same address family");
51 }
52 }
53 None if settings.server_addr.is_ipv6() => (Ipv6Addr::LOCALHOST, 0).into(),
56 None => (Ipv4Addr::LOCALHOST, 0).into(),
57 })
58}
59
60async fn do_export(reporter: JaegerCompactReporter, mut span_rx: SpanReceiver) {
61 while let Some(span) = span_rx.recv().await {
62 let spans = [span];
64 if let Err(err) = reporter.report(&spans).await {
65 #[cfg(feature = "logging")]
66 if self::logging::is_msgsize_error(&err) {
67 self::logging::log_span_too_large_err(&err, &spans[0]);
68 continue;
69 }
70
71 reporter_error(err);
72 }
73 }
74}
75
76#[cfg(feature = "logging")]
77mod logging {
78 use cf_rustracing::tag::Tag;
79 use cf_rustracing_jaeger::span::FinishedSpan;
80 use std::io;
81
82 pub(super) fn is_msgsize_error(err: &cf_rustracing::Error) -> bool {
83 err.concrete_cause::<io::Error>()
84 .and_then(io::Error::raw_os_error)
85 == Some(libc::EMSGSIZE)
86 }
87
88 pub(super) fn log_span_too_large_err(err: &cf_rustracing::Error, span: &FinishedSpan) {
89 let tag_count = span.tags().len();
90 let tag_total: usize = span.tags().iter().map(tag_size).sum();
91 let top_tag = span.tags().iter().max_by_key(|t| tag_size(t));
92
93 let log_count = span.logs().len();
94 let log_total: usize = span.logs().iter().map(log_size).sum();
95
96 let top_tag = top_tag
97 .map(|t| format!(", top: {} @ approx {}", t.name(), tag_size(t)))
98 .unwrap_or_default();
99
100 crate::telemetry::log::error!(
101 "trace span exceeded thrift UDP message size limits";
102 "error" => %err,
103 "operation" => span.operation_name(),
104 "tags" => format!("count: {tag_count}, size: approx {tag_total}{top_tag}"),
105 "logs" => format!("count: {log_count}, size: approx {log_total}"),
106 );
107 }
108
109 fn tag_size(tag: &Tag) -> usize {
113 use cf_rustracing::tag::TagValue;
114 let val_size = match tag.value() {
115 TagValue::String(s) => s.len(),
116 TagValue::Boolean(_) => 1,
117 TagValue::Integer(_) => 8,
118 TagValue::Float(_) => 8,
119 };
120 tag.name().len() + val_size
121 }
122
123 fn log_size(log: &cf_rustracing::log::Log) -> usize {
125 log.fields()
126 .iter()
127 .map(|f| f.name().len() + f.value().len())
128 .sum()
129 }
130}