1#![doc = include_str!("../README.md")]
4
5mod thrift;
6
7use std::error::Error;
8use std::net::SocketAddr;
9use std::net::UdpSocket;
10
11use minitrace::collector::Reporter;
12use minitrace::prelude::*;
13use thrift::Log;
14use thrift_codec::message::Message;
15use thrift_codec::CompactEncode;
16
17use crate::thrift::Batch;
18use crate::thrift::EmitBatchNotification;
19use crate::thrift::JaegerSpan;
20use crate::thrift::Process;
21use crate::thrift::Tag;
22
23pub struct JaegerReporter {
25 agent_addr: SocketAddr,
26 service_name: String,
27 socket: UdpSocket,
28}
29
30impl JaegerReporter {
31 pub fn new(
32 agent_addr: SocketAddr,
33 service_name: impl Into<String>,
34 ) -> Result<Self, Box<dyn Error + Send + Sync + 'static>> {
35 let local_addr: SocketAddr = if agent_addr.is_ipv4() {
36 "0.0.0.0:0"
37 } else {
38 "[::]:0"
39 }
40 .parse()
41 .unwrap();
42 let socket = std::net::UdpSocket::bind(local_addr)?;
43
44 Ok(Self {
45 agent_addr,
46 service_name: service_name.into(),
47 socket,
48 })
49 }
50
51 fn convert(&self, spans: &[SpanRecord]) -> Vec<JaegerSpan> {
52 spans
53 .iter()
54 .map(move |s| JaegerSpan {
55 trace_id_high: (s.trace_id.0 >> 64) as i64,
56 trace_id_low: s.trace_id.0 as i64,
57 span_id: s.span_id.0 as i64,
58 parent_span_id: s.parent_id.0 as i64,
59 operation_name: s.name.to_string(),
60 references: vec![],
61 flags: 1,
62 start_time: (s.begin_time_unix_ns / 1_000) as i64,
63 duration: (s.duration_ns / 1_000) as i64,
64 tags: s
65 .properties
66 .iter()
67 .map(|(k, v)| Tag::String {
68 key: k.to_string(),
69 value: v.to_string(),
70 })
71 .collect(),
72 logs: s
73 .events
74 .iter()
75 .map(|event| Log {
76 timestamp: (event.timestamp_unix_ns / 1_000) as i64,
77 fields: [("name".into(), event.name.clone())]
78 .iter()
79 .chain(&event.properties)
80 .map(|(k, v)| Tag::String {
81 key: k.to_string(),
82 value: v.to_string(),
83 })
84 .collect(),
85 })
86 .collect(),
87 })
88 .collect()
89 }
90
91 fn serialize(&self, spans: Vec<JaegerSpan>) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
92 let bn = EmitBatchNotification {
93 batch: Batch {
94 process: Process {
95 service_name: self.service_name.clone(),
96 tags: vec![],
97 },
98 spans,
99 },
100 };
101
102 let mut bytes = Vec::new();
103 let msg = Message::from(bn);
104 msg.compact_encode(&mut bytes)?;
105
106 Ok(bytes)
107 }
108
109 fn try_report(&self, spans: &[SpanRecord]) -> Result<(), Box<dyn std::error::Error>> {
110 const MAX_UDP_PACKAGE_SIZE: usize = 8000;
111
112 let mut spans_per_batch = spans.len();
113 let mut sent_spans = 0;
114
115 while sent_spans < spans.len() {
116 let batch_size = spans_per_batch.min(spans.len() - sent_spans);
117 let jaeger_spans = self.convert(&spans[sent_spans..sent_spans + batch_size]);
118 let bytes = self.serialize(jaeger_spans)?;
119 if bytes.len() >= MAX_UDP_PACKAGE_SIZE {
120 if batch_size <= 1 {
121 sent_spans += 1;
122 } else {
123 spans_per_batch /= 2;
124 }
125 continue;
126 }
127 self.socket.send_to(&bytes, self.agent_addr)?;
128 sent_spans += batch_size;
129 }
130
131 Ok(())
132 }
133}
134
135impl Reporter for JaegerReporter {
136 fn report(&mut self, spans: &[SpanRecord]) {
137 if spans.is_empty() {
138 return;
139 }
140
141 if let Err(err) = self.try_report(spans) {
142 log::error!("report to jaeger failed: {}", err);
143 }
144 }
145}