muta_apm/
lib.rs

1//!
2
3pub use muta_apm_derive as derive;
4pub use rustracing;
5pub use rustracing_jaeger;
6
7use std::borrow::Cow;
8use std::net::SocketAddr;
9
10use parking_lot::RwLock;
11use rustracing::sampler::AllSampler;
12use rustracing::tag::Tag;
13use rustracing_jaeger::reporter::JaegerCompactReporter;
14use rustracing_jaeger::span::{
15    Span, SpanContext, SpanContextState, SpanContextStateBuilder, TraceId,
16};
17use rustracing_jaeger::Tracer;
18
19const SPAN_CHANNEL_SIZE: usize = 1024 * 1024;
20const DEFAULT_SPAN_BATCH_SIZE: usize = 20;
21
22lazy_static::lazy_static! {
23    pub static ref MUTA_TRACER: MutaTracer = MutaTracer::new();
24}
25
26pub fn global_tracer_register(service_name: &str, udp_addr: SocketAddr, batch_size: Option<usize>) {
27    let (span_tx, span_rx) = crossbeam_channel::bounded(SPAN_CHANNEL_SIZE);
28    let batch_size = batch_size.unwrap_or_else(|| DEFAULT_SPAN_BATCH_SIZE);
29    let mut reporter = JaegerCompactReporter::new(service_name).unwrap();
30    let mut tracer = MUTA_TRACER.inner.write();
31    *tracer = Some(Tracer::with_sender(AllSampler, span_tx));
32
33    reporter
34        .set_agent_addr(udp_addr);
35
36    let mut batch_spans = Vec::with_capacity(batch_size + 1);
37    std::thread::spawn(move || loop {
38        if let Ok(finished_span) = span_rx.recv() {
39            batch_spans.push(finished_span);
40
41            if batch_spans.len() >= batch_size {
42                let enough_spans = batch_spans.drain(..).collect::<Vec<_>>();
43                if let Err(err) = reporter.report(&enough_spans) {
44                    log::warn!("jaeger report {}", err);
45                }
46            }
47        }
48    });
49}
50
51pub struct MutaTracer {
52    pub(crate) inner: RwLock<Option<Tracer>>,
53}
54
55impl MutaTracer {
56    pub fn new() -> Self {
57        MutaTracer {
58            inner: RwLock::new(None),
59        }
60    }
61
62    pub fn child_of_span<N: Into<Cow<'static, str>>>(
63        &self,
64        opt_name: N,
65        parent_ctx: SpanContext,
66        tags: Vec<Tag>,
67    ) -> Option<Span> {
68        match self.inner.read().as_ref() {
69            Some(inner) => {
70                let mut span = inner.span(opt_name);
71                for tag in tags.into_iter() {
72                    span = span.tag(tag);
73                }
74                Some(span.child_of(&parent_ctx).start())
75            }
76            None => None,
77        }
78    }
79
80    pub fn span<N: Into<Cow<'static, str>>>(&self, opt_name: N, tags: Vec<Tag>) -> Option<Span> {
81        match self.inner.read().as_ref() {
82            Some(inner) => {
83                let mut span = inner.span(opt_name);
84                for tag in tags.into_iter() {
85                    span = span.tag(tag);
86                }
87                Some(span.start())
88            }
89            None => None,
90        }
91    }
92
93    pub fn new_state(trace_id: TraceId, span_id: u64) -> SpanContextState {
94        SpanContextStateBuilder::new()
95            .trace_id(trace_id)
96            .span_id(span_id)
97            .finish()
98    }
99
100    pub fn span_state(ctx: &creep::Context) -> Option<SpanContextState> {
101        if let Some(Some(parent_ctx)) = ctx.get::<Option<SpanContext>>("parent_span_ctx") {
102            Some(parent_ctx.state().to_owned())
103        } else {
104            None
105        }
106    }
107
108    pub fn inject_span_state(ctx: creep::Context, span_state: SpanContextState) -> creep::Context {
109        let span = SpanContext::new(span_state, vec![]);
110        ctx.with_value::<Option<SpanContext>>("parent_span_ctx", Some(span))
111    }
112}