1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2
3use anyhow::Result as AnyResult;
4use rand::prelude::*;
5use serde_json::{json, Value};
6use tracing::{
7 span::{Attributes, Record},
8 Event, Id, Level, Subscriber,
9};
10use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};
11
12use crate::{
13 apm_client::{ApmClient, Batch},
14 config::Config,
15 model::{Agent, Error, Log, Metadata, Service, Span, Transaction},
16 visitor::{ApmVisitor, TraceIdVisitor},
17};
18
19#[derive(Copy, Clone)]
20struct TraceContext {
21 pub trace_id: u128,
22}
23
24struct SpanContext {
25 pub duration: Duration,
26 pub last_timestamp: Instant,
27}
28
29pub struct ApmLayer {
31 client: ApmClient,
32 metadata: Value,
33}
34
35impl<S> Layer<S> for ApmLayer
36where
37 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
38{
39 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
40 let now = SystemTime::now()
41 .duration_since(UNIX_EPOCH)
42 .unwrap()
43 .as_micros() as u64;
44 let timestamp = Instant::now();
45
46 let span = ctx.span(id).expect("Span not found, this is a bug");
47 let mut extensions = span.extensions_mut();
48
49 let mut visitor = ApmVisitor::default();
50 attrs.record(&mut visitor);
51
52 extensions.insert(visitor);
53 extensions.insert(SpanContext {
54 duration: Duration::new(0, 0),
55 last_timestamp: timestamp,
56 });
57
58 let name = span.name().to_string();
59
60 if let Some(parent_id) = span.parent().map(|span_ref| span_ref.id()) {
61 let parent_span = ctx.span(&parent_id).expect("Span parent not found!");
62 let parent_extensions = parent_span.extensions();
63 let trace_ctx = parent_extensions
64 .get::<TraceContext>()
65 .expect("Trace context not found!");
66
67 let new_span = Span {
68 id: id.into_u64().to_string(),
69 trace_id: trace_ctx.trace_id.to_string(),
70 parent_id: parent_id.into_u64().to_string(),
71 timestamp: Some(now),
72 name,
73 span_type: "custom".to_string(),
74 ..Default::default()
75 };
76
77 extensions.insert(new_span);
78 extensions.insert(*trace_ctx);
79 } else {
80 let mut visitor = TraceIdVisitor::default();
81 attrs.record(&mut visitor);
82
83 let trace_ctx = TraceContext {
84 trace_id: visitor.0.unwrap_or_else(random),
85 };
86
87 let new_transaction = Transaction {
88 id: id.into_u64().to_string(),
89 transaction_type: "custom".to_string(),
90 trace_id: trace_ctx.trace_id.to_string(),
91 timestamp: Some(now),
92 name: Some(name),
93 ..Default::default()
94 };
95
96 extensions.insert(new_transaction);
97 extensions.insert(trace_ctx);
98 }
99 }
100
101 fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
102 let span = ctx.span(span).expect("Span not found!");
103 let mut extensions = span.extensions_mut();
104
105 let visitor = extensions
106 .get_mut::<ApmVisitor>()
107 .expect("Visitor not found!");
108 values.record(visitor);
109 }
110
111 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
112 let metadata = event.metadata();
113 if metadata.level() != &Level::ERROR {
114 return;
115 }
116
117 let parent_id = if let Some(parent_id) = event.parent() {
118 Some(parent_id.clone())
120 } else if event.is_root() {
121 None
123 } else {
124 ctx.current_span().id().cloned()
125 };
126
127 if let Some(parent_id) = &parent_id {
128 let span = ctx.span(parent_id).expect("Span not found!");
129 let extensions = span.extensions();
130 let trace_ctx = extensions
131 .get::<TraceContext>()
132 .expect("Trace context not found!");
133
134 let mut visitor = ApmVisitor::default();
135 event.record(&mut visitor);
136
137 let error = Error {
138 id: random::<u128>().to_string(),
139 trace_id: Some(trace_ctx.trace_id.to_string()),
140 parent_id: Some(parent_id.into_u64().to_string()),
141 culprit: Some(metadata.target().to_string()),
142 log: Some(Log {
143 level: Some(metadata.level().to_string()),
144 message: visitor
145 .0
146 .get("message")
147 .map(|message| message.to_string())
148 .unwrap_or_default(),
149 ..Default::default()
150 }),
151 ..Default::default()
152 };
153
154 let metadata = self.create_metadata(&visitor, metadata);
155 let batch = Batch::new(metadata, None, None, Some(json!(error)));
156 self.client.send_batch(batch);
157 }
158 }
159
160 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
161 let span = ctx.span(id).expect("Span not found!");
162 let mut extensions = span.extensions_mut();
163
164 let span_ctx = extensions
165 .get_mut::<SpanContext>()
166 .expect("Span context not found!");
167
168 span_ctx.last_timestamp = Instant::now();
169 }
170
171 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
172 let timestamp = Instant::now();
173 let span = ctx.span(id).expect("Span not found!");
174 let mut extensions = span.extensions_mut();
175
176 let span_ctx = extensions
177 .get_mut::<SpanContext>()
178 .expect("Span context not found!");
179
180 span_ctx.duration += timestamp.saturating_duration_since(span_ctx.last_timestamp);
181 }
182
183 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
184 let span = ctx.span(&id).expect("Span not found!");
185 let mut extensions = span.extensions_mut();
186 let visitor = extensions
187 .remove::<ApmVisitor>()
188 .expect("Visitor not found!");
189 let span_ctx = extensions
190 .remove::<SpanContext>()
191 .expect("Span context not found!");
192
193 let metadata = self.create_metadata(&visitor, span.metadata());
194 let duration = span_ctx.duration.as_micros() as f32 / 1000.;
195
196 let batch = if let Some(mut span) = extensions.remove::<Span>() {
197 span.duration = duration;
198 Batch::new(metadata, None, Some(json!(span)), None)
199 } else if let Some(mut transaction) = extensions.remove::<Transaction>() {
200 transaction.duration = duration;
201 Batch::new(metadata, Some(json!(transaction)), None, None)
202 } else {
203 return;
204 };
205
206 self.client.send_batch(batch);
207 }
208}
209
210impl ApmLayer {
211 pub(crate) fn new(mut config: Config, service_name: String) -> AnyResult<Self> {
212 let metadata = Metadata {
213 service: Service {
214 name: service_name,
215 version: config
216 .service
217 .as_mut()
218 .and_then(|service| service.version.take()),
219 environment: config
220 .service
221 .as_mut()
222 .and_then(|service| service.environment.take()),
223 language: config
224 .service
225 .as_mut()
226 .and_then(|service| service.language.take()),
227 runtime: config
228 .service
229 .as_mut()
230 .and_then(|service| service.runtime.take()),
231 framework: config
232 .service
233 .as_mut()
234 .and_then(|service| service.framework.take()),
235 agent: Agent {
236 name: "tracing-elastic-apm".to_string(),
237 version: version::version!().to_string(),
238 ephemeral_id: None,
239 },
240 node: config
241 .service
242 .as_mut()
243 .and_then(|service| service.node.take()),
244 },
245 process: config.process,
246 system: config.system,
247 user: config.user,
248 cloud: config.cloud,
249 labels: None,
250 };
251
252 Ok(ApmLayer {
253 client: ApmClient::new(
254 config.apm_address,
255 config.authorization,
256 config.allow_invalid_certs,
257 config.root_cert_path,
258 )?,
259 metadata: json!(metadata),
260 })
261 }
262
263 fn create_metadata(
264 &self,
265 visitor: &ApmVisitor,
266 meta: &'static tracing::Metadata<'static>,
267 ) -> Value {
268 let mut metadata = self.metadata.clone();
269
270 if !visitor.0.is_empty() {
271 metadata["labels"] = json!(visitor.0);
272 metadata["labels"]["level"] = json!(meta.level().to_string());
273 metadata["labels"]["target"] = json!(meta.target().to_string());
274 }
275
276 metadata
277 }
278}