tracing_elastic_apm/
layer.rs

1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2
3use anyhow::Result as AnyResult;
4use rand::prelude::*;
5use serde_json::{json, Value};
6use tracing::{
7    span::{Attributes, Record},
8    Event, Id, Level, Subscriber,
9};
10use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};
11
12use crate::{
13    apm_client::{ApmClient, Batch},
14    config::Config,
15    model::{Agent, Error, Log, Metadata, Service, Span, Transaction},
16    visitor::{ApmVisitor, TraceIdVisitor},
17};
18
19#[derive(Copy, Clone)]
20struct TraceContext {
21    pub trace_id: u128,
22}
23
24struct SpanContext {
25    pub duration: Duration,
26    pub last_timestamp: Instant,
27}
28
29/// Telemetry capability that publishes events and spans to Elastic APM.
30pub struct ApmLayer {
31    client: ApmClient,
32    metadata: Value,
33}
34
35impl<S> Layer<S> for ApmLayer
36where
37    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
38{
39    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
40        let now = SystemTime::now()
41            .duration_since(UNIX_EPOCH)
42            .unwrap()
43            .as_micros() as u64;
44        let timestamp = Instant::now();
45
46        let span = ctx.span(id).expect("Span not found, this is a bug");
47        let mut extensions = span.extensions_mut();
48
49        let mut visitor = ApmVisitor::default();
50        attrs.record(&mut visitor);
51
52        extensions.insert(visitor);
53        extensions.insert(SpanContext {
54            duration: Duration::new(0, 0),
55            last_timestamp: timestamp,
56        });
57
58        let name = span.name().to_string();
59
60        if let Some(parent_id) = span.parent().map(|span_ref| span_ref.id()) {
61            let parent_span = ctx.span(&parent_id).expect("Span parent not found!");
62            let parent_extensions = parent_span.extensions();
63            let trace_ctx = parent_extensions
64                .get::<TraceContext>()
65                .expect("Trace context not found!");
66
67            let new_span = Span {
68                id: id.into_u64().to_string(),
69                trace_id: trace_ctx.trace_id.to_string(),
70                parent_id: parent_id.into_u64().to_string(),
71                timestamp: Some(now),
72                name,
73                span_type: "custom".to_string(),
74                ..Default::default()
75            };
76
77            extensions.insert(new_span);
78            extensions.insert(*trace_ctx);
79        } else {
80            let mut visitor = TraceIdVisitor::default();
81            attrs.record(&mut visitor);
82
83            let trace_ctx = TraceContext {
84                trace_id: visitor.0.unwrap_or_else(random),
85            };
86
87            let new_transaction = Transaction {
88                id: id.into_u64().to_string(),
89                transaction_type: "custom".to_string(),
90                trace_id: trace_ctx.trace_id.to_string(),
91                timestamp: Some(now),
92                name: Some(name),
93                ..Default::default()
94            };
95
96            extensions.insert(new_transaction);
97            extensions.insert(trace_ctx);
98        }
99    }
100
101    fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
102        let span = ctx.span(span).expect("Span not found!");
103        let mut extensions = span.extensions_mut();
104
105        let visitor = extensions
106            .get_mut::<ApmVisitor>()
107            .expect("Visitor not found!");
108        values.record(visitor);
109    }
110
111    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
112        let metadata = event.metadata();
113        if metadata.level() != &Level::ERROR {
114            return;
115        }
116
117        let parent_id = if let Some(parent_id) = event.parent() {
118            // explicit parent
119            Some(parent_id.clone())
120        } else if event.is_root() {
121            // don't bother checking thread local if span is explicitly root according to this fn
122            None
123        } else {
124            ctx.current_span().id().cloned()
125        };
126
127        if let Some(parent_id) = &parent_id {
128            let span = ctx.span(parent_id).expect("Span not found!");
129            let extensions = span.extensions();
130            let trace_ctx = extensions
131                .get::<TraceContext>()
132                .expect("Trace context not found!");
133
134            let mut visitor = ApmVisitor::default();
135            event.record(&mut visitor);
136
137            let error = Error {
138                id: random::<u128>().to_string(),
139                trace_id: Some(trace_ctx.trace_id.to_string()),
140                parent_id: Some(parent_id.into_u64().to_string()),
141                culprit: Some(metadata.target().to_string()),
142                log: Some(Log {
143                    level: Some(metadata.level().to_string()),
144                    message: visitor
145                        .0
146                        .get("message")
147                        .map(|message| message.to_string())
148                        .unwrap_or_default(),
149                    ..Default::default()
150                }),
151                ..Default::default()
152            };
153
154            let metadata = self.create_metadata(&visitor, metadata);
155            let batch = Batch::new(metadata, None, None, Some(json!(error)));
156            self.client.send_batch(batch);
157        }
158    }
159
160    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
161        let span = ctx.span(id).expect("Span not found!");
162        let mut extensions = span.extensions_mut();
163
164        let span_ctx = extensions
165            .get_mut::<SpanContext>()
166            .expect("Span context not found!");
167
168        span_ctx.last_timestamp = Instant::now();
169    }
170
171    fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
172        let timestamp = Instant::now();
173        let span = ctx.span(id).expect("Span not found!");
174        let mut extensions = span.extensions_mut();
175
176        let span_ctx = extensions
177            .get_mut::<SpanContext>()
178            .expect("Span context not found!");
179
180        span_ctx.duration += timestamp.saturating_duration_since(span_ctx.last_timestamp);
181    }
182
183    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
184        let span = ctx.span(&id).expect("Span not found!");
185        let mut extensions = span.extensions_mut();
186        let visitor = extensions
187            .remove::<ApmVisitor>()
188            .expect("Visitor not found!");
189        let span_ctx = extensions
190            .remove::<SpanContext>()
191            .expect("Span context not found!");
192
193        let metadata = self.create_metadata(&visitor, span.metadata());
194        let duration = span_ctx.duration.as_micros() as f32 / 1000.;
195
196        let batch = if let Some(mut span) = extensions.remove::<Span>() {
197            span.duration = duration;
198            Batch::new(metadata, None, Some(json!(span)), None)
199        } else if let Some(mut transaction) = extensions.remove::<Transaction>() {
200            transaction.duration = duration;
201            Batch::new(metadata, Some(json!(transaction)), None, None)
202        } else {
203            return;
204        };
205
206        self.client.send_batch(batch);
207    }
208}
209
210impl ApmLayer {
211    pub(crate) fn new(mut config: Config, service_name: String) -> AnyResult<Self> {
212        let metadata = Metadata {
213            service: Service {
214                name: service_name,
215                version: config
216                    .service
217                    .as_mut()
218                    .and_then(|service| service.version.take()),
219                environment: config
220                    .service
221                    .as_mut()
222                    .and_then(|service| service.environment.take()),
223                language: config
224                    .service
225                    .as_mut()
226                    .and_then(|service| service.language.take()),
227                runtime: config
228                    .service
229                    .as_mut()
230                    .and_then(|service| service.runtime.take()),
231                framework: config
232                    .service
233                    .as_mut()
234                    .and_then(|service| service.framework.take()),
235                agent: Agent {
236                    name: "tracing-elastic-apm".to_string(),
237                    version: version::version!().to_string(),
238                    ephemeral_id: None,
239                },
240                node: config
241                    .service
242                    .as_mut()
243                    .and_then(|service| service.node.take()),
244            },
245            process: config.process,
246            system: config.system,
247            user: config.user,
248            cloud: config.cloud,
249            labels: None,
250        };
251
252        Ok(ApmLayer {
253            client: ApmClient::new(
254                config.apm_address,
255                config.authorization,
256                config.allow_invalid_certs,
257                config.root_cert_path,
258            )?,
259            metadata: json!(metadata),
260        })
261    }
262
263    fn create_metadata(
264        &self,
265        visitor: &ApmVisitor,
266        meta: &'static tracing::Metadata<'static>,
267    ) -> Value {
268        let mut metadata = self.metadata.clone();
269
270        if !visitor.0.is_empty() {
271            metadata["labels"] = json!(visitor.0);
272            metadata["labels"]["level"] = json!(meta.level().to_string());
273            metadata["labels"]["target"] = json!(meta.target().to_string());
274        }
275
276        metadata
277    }
278}