use super::channel::SharedSpanReceiver;
use super::init::TraceOutputFutures;
use super::internal::reporter_error;
use crate::telemetry::settings::JaegerThriftUdpOutputSettings;
use crate::{BootstrapResult, ServiceInfo};
use anyhow::bail;
use cf_rustracing::tag::Tag;
use cf_rustracing_jaeger::reporter::JaegerCompactReporter;
use futures_util::future::FutureExt as _;
use std::net::SocketAddr;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
pub(super) fn start(
service_info: &ServiceInfo,
settings: &JaegerThriftUdpOutputSettings,
span_rx: SharedSpanReceiver,
) -> BootstrapResult<TraceOutputFutures> {
let max_batch_size = settings.max_batch_size;
let server_addr = settings.server_addr.into();
let reporter_bind_addr = get_reporter_bind_addr(settings)?;
let socket = std::net::UdpSocket::bind(reporter_bind_addr)?;
socket.set_nonblocking(true)?;
let mut reporter = JaegerCompactReporter::new_with_transport(
service_info.name,
server_addr,
tokio::net::UdpSocket::from_std(socket)?,
)?;
reporter.add_service_tag(Tag::new("app.version", service_info.version));
let reporter = Arc::new(reporter);
let futs: Vec<_> = (0..settings.max_batch_size)
.map(|_| {
let reporter = Arc::clone(&reporter);
do_export(reporter, span_rx.clone(), max_batch_size).boxed()
})
.collect();
Ok(TraceOutputFutures {
initializer: None,
workers: futs,
})
}
fn get_reporter_bind_addr(settings: &JaegerThriftUdpOutputSettings) -> BootstrapResult<SocketAddr> {
Ok(match settings.reporter_bind_addr {
Some(addr) => {
if settings.server_addr.is_ipv6() == addr.is_ipv6() {
addr.into()
} else {
bail!(
"`jaeger_tracing_server_addr` and `jaeger_reporter_bind_addr` must have the same address family"
);
}
}
None if settings.server_addr.is_ipv6() => (Ipv6Addr::LOCALHOST, 0).into(),
None => (Ipv4Addr::LOCALHOST, 0).into(),
})
}
async fn do_export(
reporter: Arc<JaegerCompactReporter>,
span_rx: SharedSpanReceiver,
max_batch_size: usize,
) {
let mut batch = Vec::with_capacity(max_batch_size);
while span_rx.recv_many(&mut batch, max_batch_size).await > 0 {
for span in batch.drain(..) {
let spans = [span];
if let Err(err) = reporter.report(&spans).await {
#[cfg(feature = "logging")]
if self::logging::is_msgsize_error(&err) {
self::logging::log_span_too_large_err(&err, &spans[0]);
continue;
}
reporter_error(err);
}
}
}
}
#[cfg(feature = "logging")]
mod logging {
use cf_rustracing::tag::Tag;
use cf_rustracing_jaeger::span::FinishedSpan;
use std::io;
pub(super) fn is_msgsize_error(err: &cf_rustracing::Error) -> bool {
err.concrete_cause::<io::Error>()
.and_then(io::Error::raw_os_error)
== Some(libc::EMSGSIZE)
}
pub(super) fn log_span_too_large_err(err: &cf_rustracing::Error, span: &FinishedSpan) {
let tag_count = span.tags().len();
let tag_total: usize = span.tags().iter().map(tag_size).sum();
let top_tag = span.tags().iter().max_by_key(|t| tag_size(t));
let log_count = span.logs().len();
let log_total: usize = span.logs().iter().map(log_size).sum();
let top_tag = top_tag
.map(|t| format!(", top: {} @ approx {}", t.name(), tag_size(t)))
.unwrap_or_default();
crate::telemetry::log::error!(
"trace span exceeded thrift UDP message size limits";
"error" => %err,
"operation" => span.operation_name(),
"tags" => format!("count: {tag_count}, size: approx {tag_total}{top_tag}"),
"logs" => format!("count: {log_count}, size: approx {log_total}"),
);
}
fn tag_size(tag: &Tag) -> usize {
use cf_rustracing::tag::TagValue;
let val_size = match tag.value() {
TagValue::String(s) => s.len(),
TagValue::Boolean(_) => 1,
TagValue::Integer(_) => 8,
TagValue::Float(_) => 8,
};
tag.name().len() + val_size
}
fn log_size(log: &cf_rustracing::log::Log) -> usize {
log.fields()
.iter()
.map(|f| f.name().len() + f.value().len())
.sum()
}
}