minitrace_jaeger/
lib.rs

1// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
2
3#![doc = include_str!("../README.md")]
4
5mod thrift;
6
7use std::error::Error;
8use std::net::SocketAddr;
9use std::net::UdpSocket;
10
11use minitrace::collector::Reporter;
12use minitrace::prelude::*;
13use thrift::Log;
14use thrift_codec::message::Message;
15use thrift_codec::CompactEncode;
16
17use crate::thrift::Batch;
18use crate::thrift::EmitBatchNotification;
19use crate::thrift::JaegerSpan;
20use crate::thrift::Process;
21use crate::thrift::Tag;
22
23/// [Jaeger](https://www.jaegertracing.io/) reporter for `minitrace` via UDP endpoint.
24pub struct JaegerReporter {
25    agent_addr: SocketAddr,
26    service_name: String,
27    socket: UdpSocket,
28}
29
30impl JaegerReporter {
31    pub fn new(
32        agent_addr: SocketAddr,
33        service_name: impl Into<String>,
34    ) -> Result<Self, Box<dyn Error + Send + Sync + 'static>> {
35        let local_addr: SocketAddr = if agent_addr.is_ipv4() {
36            "0.0.0.0:0"
37        } else {
38            "[::]:0"
39        }
40        .parse()
41        .unwrap();
42        let socket = std::net::UdpSocket::bind(local_addr)?;
43
44        Ok(Self {
45            agent_addr,
46            service_name: service_name.into(),
47            socket,
48        })
49    }
50
51    fn convert(&self, spans: &[SpanRecord]) -> Vec<JaegerSpan> {
52        spans
53            .iter()
54            .map(move |s| JaegerSpan {
55                trace_id_high: (s.trace_id.0 >> 64) as i64,
56                trace_id_low: s.trace_id.0 as i64,
57                span_id: s.span_id.0 as i64,
58                parent_span_id: s.parent_id.0 as i64,
59                operation_name: s.name.to_string(),
60                references: vec![],
61                flags: 1,
62                start_time: (s.begin_time_unix_ns / 1_000) as i64,
63                duration: (s.duration_ns / 1_000) as i64,
64                tags: s
65                    .properties
66                    .iter()
67                    .map(|(k, v)| Tag::String {
68                        key: k.to_string(),
69                        value: v.to_string(),
70                    })
71                    .collect(),
72                logs: s
73                    .events
74                    .iter()
75                    .map(|event| Log {
76                        timestamp: (event.timestamp_unix_ns / 1_000) as i64,
77                        fields: [("name".into(), event.name.clone())]
78                            .iter()
79                            .chain(&event.properties)
80                            .map(|(k, v)| Tag::String {
81                                key: k.to_string(),
82                                value: v.to_string(),
83                            })
84                            .collect(),
85                    })
86                    .collect(),
87            })
88            .collect()
89    }
90
91    fn serialize(&self, spans: Vec<JaegerSpan>) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
92        let bn = EmitBatchNotification {
93            batch: Batch {
94                process: Process {
95                    service_name: self.service_name.clone(),
96                    tags: vec![],
97                },
98                spans,
99            },
100        };
101
102        let mut bytes = Vec::new();
103        let msg = Message::from(bn);
104        msg.compact_encode(&mut bytes)?;
105
106        Ok(bytes)
107    }
108
109    fn try_report(&self, spans: &[SpanRecord]) -> Result<(), Box<dyn std::error::Error>> {
110        const MAX_UDP_PACKAGE_SIZE: usize = 8000;
111
112        let mut spans_per_batch = spans.len();
113        let mut sent_spans = 0;
114
115        while sent_spans < spans.len() {
116            let batch_size = spans_per_batch.min(spans.len() - sent_spans);
117            let jaeger_spans = self.convert(&spans[sent_spans..sent_spans + batch_size]);
118            let bytes = self.serialize(jaeger_spans)?;
119            if bytes.len() >= MAX_UDP_PACKAGE_SIZE {
120                if batch_size <= 1 {
121                    sent_spans += 1;
122                } else {
123                    spans_per_batch /= 2;
124                }
125                continue;
126            }
127            self.socket.send_to(&bytes, self.agent_addr)?;
128            sent_spans += batch_size;
129        }
130
131        Ok(())
132    }
133}
134
135impl Reporter for JaegerReporter {
136    fn report(&mut self, spans: &[SpanRecord]) {
137        if spans.is_empty() {
138            return;
139        }
140
141        if let Err(err) = self.try_report(spans) {
142            log::error!("report to jaeger failed: {}", err);
143        }
144    }
145}