use super::internal::reporter_error;
use crate::telemetry::settings::JaegerThriftUdpOutputSettings;
use crate::{BootstrapResult, ServiceInfo};
use anyhow::bail;
use cf_rustracing::tag::Tag;
use cf_rustracing_jaeger::reporter::JaegerCompactReporter;
use cf_rustracing_jaeger::span::SpanReceiver;
use futures_util::future::{BoxFuture, FutureExt as _};
use std::net::SocketAddr;
use std::net::{Ipv4Addr, Ipv6Addr};
pub(super) fn start(
service_info: ServiceInfo,
settings: &JaegerThriftUdpOutputSettings,
span_rx: SpanReceiver,
) -> BootstrapResult<BoxFuture<'static, BootstrapResult<()>>> {
let server_addr = settings.server_addr.into();
let reporter_bind_addr = get_reporter_bind_addr(settings)?;
let socket = std::net::UdpSocket::bind(reporter_bind_addr)?;
socket.set_nonblocking(true)?;
Ok(async move {
let mut reporter = JaegerCompactReporter::new_with_transport(
service_info.name,
server_addr,
tokio::net::UdpSocket::from_std(socket)?,
)?;
reporter.add_service_tag(Tag::new("app.version", service_info.version));
do_export(reporter, span_rx).await;
Ok(())
}
.boxed())
}
fn get_reporter_bind_addr(settings: &JaegerThriftUdpOutputSettings) -> BootstrapResult<SocketAddr> {
Ok(match settings.reporter_bind_addr {
Some(addr) => {
if settings.server_addr.is_ipv6() == addr.is_ipv6() {
addr.into()
} else {
bail!("`jaeger_tracing_server_addr` and `jaeger_reporter_bind_addr` must have the same address family");
}
}
None if settings.server_addr.is_ipv6() => (Ipv6Addr::LOCALHOST, 0).into(),
None => (Ipv4Addr::LOCALHOST, 0).into(),
})
}
async fn do_export(reporter: JaegerCompactReporter, mut span_rx: SpanReceiver) {
while let Some(span) = span_rx.recv().await {
if let Err(err) = reporter.report(&[span][..]).await {
reporter_error(err);
}
}
}