1pub use muta_apm_derive as derive;
4pub use rustracing;
5pub use rustracing_jaeger;
6
7use std::borrow::Cow;
8use std::net::SocketAddr;
9
10use parking_lot::RwLock;
11use rustracing::sampler::AllSampler;
12use rustracing::tag::Tag;
13use rustracing_jaeger::reporter::JaegerCompactReporter;
14use rustracing_jaeger::span::{
15 Span, SpanContext, SpanContextState, SpanContextStateBuilder, TraceId,
16};
17use rustracing_jaeger::Tracer;
18
19const SPAN_CHANNEL_SIZE: usize = 1024 * 1024;
20const DEFAULT_SPAN_BATCH_SIZE: usize = 20;
21
22lazy_static::lazy_static! {
23 pub static ref MUTA_TRACER: MutaTracer = MutaTracer::new();
24}
25
26pub fn global_tracer_register(service_name: &str, udp_addr: SocketAddr, batch_size: Option<usize>) {
27 let (span_tx, span_rx) = crossbeam_channel::bounded(SPAN_CHANNEL_SIZE);
28 let batch_size = batch_size.unwrap_or_else(|| DEFAULT_SPAN_BATCH_SIZE);
29 let mut reporter = JaegerCompactReporter::new(service_name).unwrap();
30 let mut tracer = MUTA_TRACER.inner.write();
31 *tracer = Some(Tracer::with_sender(AllSampler, span_tx));
32
33 reporter
34 .set_agent_addr(udp_addr);
35
36 let mut batch_spans = Vec::with_capacity(batch_size + 1);
37 std::thread::spawn(move || loop {
38 if let Ok(finished_span) = span_rx.recv() {
39 batch_spans.push(finished_span);
40
41 if batch_spans.len() >= batch_size {
42 let enough_spans = batch_spans.drain(..).collect::<Vec<_>>();
43 if let Err(err) = reporter.report(&enough_spans) {
44 log::warn!("jaeger report {}", err);
45 }
46 }
47 }
48 });
49}
50
51pub struct MutaTracer {
52 pub(crate) inner: RwLock<Option<Tracer>>,
53}
54
55impl MutaTracer {
56 pub fn new() -> Self {
57 MutaTracer {
58 inner: RwLock::new(None),
59 }
60 }
61
62 pub fn child_of_span<N: Into<Cow<'static, str>>>(
63 &self,
64 opt_name: N,
65 parent_ctx: SpanContext,
66 tags: Vec<Tag>,
67 ) -> Option<Span> {
68 match self.inner.read().as_ref() {
69 Some(inner) => {
70 let mut span = inner.span(opt_name);
71 for tag in tags.into_iter() {
72 span = span.tag(tag);
73 }
74 Some(span.child_of(&parent_ctx).start())
75 }
76 None => None,
77 }
78 }
79
80 pub fn span<N: Into<Cow<'static, str>>>(&self, opt_name: N, tags: Vec<Tag>) -> Option<Span> {
81 match self.inner.read().as_ref() {
82 Some(inner) => {
83 let mut span = inner.span(opt_name);
84 for tag in tags.into_iter() {
85 span = span.tag(tag);
86 }
87 Some(span.start())
88 }
89 None => None,
90 }
91 }
92
93 pub fn new_state(trace_id: TraceId, span_id: u64) -> SpanContextState {
94 SpanContextStateBuilder::new()
95 .trace_id(trace_id)
96 .span_id(span_id)
97 .finish()
98 }
99
100 pub fn span_state(ctx: &creep::Context) -> Option<SpanContextState> {
101 if let Some(Some(parent_ctx)) = ctx.get::<Option<SpanContext>>("parent_span_ctx") {
102 Some(parent_ctx.state().to_owned())
103 } else {
104 None
105 }
106 }
107
108 pub fn inject_span_state(ctx: creep::Context, span_state: SpanContextState) -> creep::Context {
109 let span = SpanContext::new(span_state, vec![]);
110 ctx.with_value::<Option<SpanContext>>("parent_span_ctx", Some(span))
111 }
112}