opentelemetry_user_events_metrics/exporter/mod.rs
1use async_trait::async_trait;
2use opentelemetry::{otel_debug, otel_warn};
3use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
4use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
5use opentelemetry_sdk::metrics::data;
6use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
7use opentelemetry_sdk::metrics::{
8 data::{
9 ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, ScopeMetrics,
10 },
11 Temporality,
12};
13
14use crate::tracepoint;
15use eventheader::_internal as ehi;
16use prost::Message;
17use std::fmt::{Debug, Formatter};
18use std::pin::Pin;
19
20const MAX_EVENT_SIZE: usize = 65360;
21
22pub struct MetricsExporter {
23 trace_point: Pin<Box<ehi::TracepointState>>,
24}
25
26impl MetricsExporter {
27 pub fn new() -> MetricsExporter {
28 let trace_point = Box::pin(ehi::TracepointState::new(0));
29 // This is unsafe because if the code is used in a shared object,
30 // the event MUST be unregistered before the shared object unloads.
31 unsafe {
32 let _result = tracepoint::register(trace_point.as_ref());
33 }
34 MetricsExporter { trace_point }
35 }
36}
37
38impl Default for MetricsExporter {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl Debug for MetricsExporter {
45 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46 f.write_str("user_events metrics exporter")
47 }
48}
49
50impl MetricsExporter {
51 fn serialize_and_write(
52 &self,
53 resource_metric: &ResourceMetrics,
54 metric_name: &str,
55 metric_type: &str,
56 ) -> OTelSdkResult {
57 // Allocate a local buffer for each write operation
58 // TODO: Investigate if this can be optimized to avoid reallocation or
59 // allocate a fixed buffer size for all writes
60 let mut byte_array = Vec::new();
61
62 // Convert to proto message
63 let proto_message: ExportMetricsServiceRequest = resource_metric.into();
64 otel_debug!(name: "SerializeStart",
65 metric_name = metric_name,
66 metric_type = metric_type);
67
68 // Encode directly into the buffer
69 match proto_message.encode(&mut byte_array) {
70 Ok(_) => {
71 otel_debug!(name: "SerializeSuccess",
72 metric_name = metric_name,
73 metric_type = metric_type,
74 size = byte_array.len());
75 }
76 Err(err) => {
77 otel_debug!(name: "SerializeFailed",
78 error = err.to_string(),
79 metric_name = metric_name,
80 metric_type = metric_type,
81 size = byte_array.len());
82 return Err(OTelSdkError::InternalFailure(err.to_string()));
83 }
84 }
85
86 // Check if the encoded message exceeds the 64 KB limit
87 if byte_array.len() > MAX_EVENT_SIZE {
88 otel_debug!(
89 name: "MaxEventSizeExceeded",
90 reason = format!("Encoded event size exceeds maximum allowed limit of {} bytes. Event will be dropped.", MAX_EVENT_SIZE),
91 metric_name = metric_name,
92 metric_type = metric_type,
93 size = byte_array.len()
94 );
95 return Err(OTelSdkError::InternalFailure(
96 "Event size exceeds maximum allowed limit".into(),
97 ));
98 }
99
100 // Write to the tracepoint
101 let result = tracepoint::write(&self.trace_point, &byte_array);
102 if result > 0 {
103 otel_debug!(name: "TracepointWrite", message = "Encoded data successfully written to tracepoint", size = byte_array.len(), metric_name = metric_name, metric_type = metric_type);
104 }
105
106 Ok(())
107 }
108}
109
110#[async_trait]
111impl PushMetricExporter for MetricsExporter {
112 async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
113 otel_debug!(name: "ExportStart", message = "Starting metrics export");
114 if !self.trace_point.enabled() {
115 // TODO - This can flood the logs if the tracepoint is disabled for long periods of time
116 otel_warn!(name: "TracepointDisabled", message = "Tracepoint is disabled, skipping export");
117 return Ok(());
118 }
119
120 if self.trace_point.enabled() {
121 let mut errors = Vec::new();
122
123 for scope_metric in &metrics.scope_metrics {
124 for metric in &scope_metric.metrics {
125 let data = &metric.data.as_any();
126
127 if let Some(histogram) = data.downcast_ref::<data::Histogram<u64>>() {
128 for data_point in &histogram.data_points {
129 let resource_metric = ResourceMetrics {
130 resource: metrics.resource.clone(),
131 scope_metrics: vec![ScopeMetrics {
132 scope: scope_metric.scope.clone(),
133 metrics: vec![Metric {
134 name: metric.name.clone(),
135 description: metric.description.clone(),
136 unit: metric.unit.clone(),
137 data: Box::new(data::Histogram {
138 temporality: histogram.temporality,
139 start_time: histogram.start_time,
140 time: histogram.time,
141 data_points: vec![data_point.clone()],
142 }),
143 }],
144 }],
145 };
146 if let Err(e) = self.serialize_and_write(
147 &resource_metric,
148 &metric.name,
149 "Histogram<u64>",
150 ) {
151 errors.push(e.to_string());
152 }
153 }
154 } else if let Some(histogram) = data.downcast_ref::<data::Histogram<f64>>() {
155 for data_point in &histogram.data_points {
156 let resource_metric = ResourceMetrics {
157 resource: metrics.resource.clone(),
158 scope_metrics: vec![ScopeMetrics {
159 scope: scope_metric.scope.clone(),
160 metrics: vec![Metric {
161 name: metric.name.clone(),
162 description: metric.description.clone(),
163 unit: metric.unit.clone(),
164 data: Box::new(data::Histogram {
165 temporality: histogram.temporality,
166 start_time: histogram.start_time,
167 time: histogram.time,
168 data_points: vec![data_point.clone()],
169 }),
170 }],
171 }],
172 };
173 if let Err(e) = self.serialize_and_write(
174 &resource_metric,
175 &metric.name,
176 "Histogram<f64>",
177 ) {
178 errors.push(e.to_string());
179 }
180 }
181 } else if let Some(gauge) = data.downcast_ref::<data::Gauge<u64>>() {
182 for data_point in &gauge.data_points {
183 let resource_metric = ResourceMetrics {
184 resource: metrics.resource.clone(),
185 scope_metrics: vec![ScopeMetrics {
186 scope: scope_metric.scope.clone(),
187 metrics: vec![Metric {
188 name: metric.name.clone(),
189 description: metric.description.clone(),
190 unit: metric.unit.clone(),
191 data: Box::new(data::Gauge {
192 data_points: vec![data_point.clone()],
193 start_time: gauge.start_time,
194 time: gauge.time,
195 }),
196 }],
197 }],
198 };
199 if let Err(e) = self.serialize_and_write(
200 &resource_metric,
201 &metric.name,
202 "Gauge<u64>",
203 ) {
204 errors.push(e.to_string());
205 }
206 }
207 } else if let Some(gauge) = data.downcast_ref::<data::Gauge<i64>>() {
208 for data_point in &gauge.data_points {
209 let resource_metric = ResourceMetrics {
210 resource: metrics.resource.clone(),
211 scope_metrics: vec![ScopeMetrics {
212 scope: scope_metric.scope.clone(),
213 metrics: vec![Metric {
214 name: metric.name.clone(),
215 description: metric.description.clone(),
216 unit: metric.unit.clone(),
217 data: Box::new(data::Gauge {
218 data_points: vec![data_point.clone()],
219 start_time: gauge.start_time,
220 time: gauge.time,
221 }),
222 }],
223 }],
224 };
225 if let Err(e) = self.serialize_and_write(
226 &resource_metric,
227 &metric.name,
228 "Gauge<i64>",
229 ) {
230 errors.push(e.to_string());
231 }
232 }
233 } else if let Some(gauge) = data.downcast_ref::<data::Gauge<f64>>() {
234 for data_point in &gauge.data_points {
235 let resource_metric = ResourceMetrics {
236 resource: metrics.resource.clone(),
237 scope_metrics: vec![ScopeMetrics {
238 scope: scope_metric.scope.clone(),
239 metrics: vec![Metric {
240 name: metric.name.clone(),
241 description: metric.description.clone(),
242 unit: metric.unit.clone(),
243 data: Box::new(data::Gauge {
244 data_points: vec![data_point.clone()],
245 start_time: gauge.start_time,
246 time: gauge.time,
247 }),
248 }],
249 }],
250 };
251 if let Err(e) = self.serialize_and_write(
252 &resource_metric,
253 &metric.name,
254 "Gauge<f64>",
255 ) {
256 errors.push(e.to_string());
257 }
258 }
259 } else if let Some(sum) = data.downcast_ref::<data::Sum<u64>>() {
260 for data_point in &sum.data_points {
261 let resource_metric = ResourceMetrics {
262 resource: metrics.resource.clone(),
263 scope_metrics: vec![ScopeMetrics {
264 scope: scope_metric.scope.clone(),
265 metrics: vec![Metric {
266 name: metric.name.clone(),
267 description: metric.description.clone(),
268 unit: metric.unit.clone(),
269 data: Box::new(data::Sum {
270 temporality: sum.temporality,
271 data_points: vec![data_point.clone()],
272 is_monotonic: sum.is_monotonic,
273 start_time: sum.start_time,
274 time: sum.time,
275 }),
276 }],
277 }],
278 };
279 if let Err(e) =
280 self.serialize_and_write(&resource_metric, &metric.name, "Sum<u64>")
281 {
282 errors.push(e.to_string());
283 }
284 }
285 } else if let Some(sum) = data.downcast_ref::<data::Sum<i64>>() {
286 for data_point in &sum.data_points {
287 let resource_metric = ResourceMetrics {
288 resource: metrics.resource.clone(),
289 scope_metrics: vec![ScopeMetrics {
290 scope: scope_metric.scope.clone(),
291 metrics: vec![Metric {
292 name: metric.name.clone(),
293 description: metric.description.clone(),
294 unit: metric.unit.clone(),
295 data: Box::new(data::Sum {
296 temporality: sum.temporality,
297 data_points: vec![data_point.clone()],
298 is_monotonic: sum.is_monotonic,
299 start_time: sum.start_time,
300 time: sum.time,
301 }),
302 }],
303 }],
304 };
305 if let Err(e) =
306 self.serialize_and_write(&resource_metric, &metric.name, "Sum<i64>")
307 {
308 errors.push(e.to_string());
309 }
310 }
311 } else if let Some(sum) = data.downcast_ref::<data::Sum<f64>>() {
312 for data_point in &sum.data_points {
313 let resource_metric = ResourceMetrics {
314 resource: metrics.resource.clone(),
315 scope_metrics: vec![ScopeMetrics {
316 scope: scope_metric.scope.clone(),
317 metrics: vec![Metric {
318 name: metric.name.clone(),
319 description: metric.description.clone(),
320 unit: metric.unit.clone(),
321 data: Box::new(data::Sum {
322 temporality: sum.temporality,
323 data_points: vec![data_point.clone()],
324 is_monotonic: sum.is_monotonic,
325 start_time: sum.start_time,
326 time: sum.time,
327 }),
328 }],
329 }],
330 };
331 if let Err(e) =
332 self.serialize_and_write(&resource_metric, &metric.name, "Sum<f64>")
333 {
334 errors.push(e.to_string());
335 }
336 }
337 } else if let Some(exp_hist) =
338 data.downcast_ref::<data::ExponentialHistogram<u64>>()
339 {
340 for data_point in &exp_hist.data_points {
341 let resource_metric = ResourceMetrics {
342 resource: metrics.resource.clone(),
343 scope_metrics: vec![ScopeMetrics {
344 scope: scope_metric.scope.clone(),
345 metrics: vec![Metric {
346 name: metric.name.clone(),
347 description: metric.description.clone(),
348 unit: metric.unit.clone(),
349 data: Box::new(data::ExponentialHistogram {
350 temporality: exp_hist.temporality,
351 start_time: exp_hist.start_time,
352 time: exp_hist.time,
353 data_points: vec![ExponentialHistogramDataPoint {
354 attributes: data_point.attributes.clone(),
355 count: data_point.count,
356 min: data_point.min,
357 max: data_point.max,
358 sum: data_point.sum,
359 scale: data_point.scale,
360 zero_count: data_point.zero_count,
361 zero_threshold: data_point.zero_threshold,
362 positive_bucket: ExponentialBucket {
363 offset: data_point.positive_bucket.offset,
364 counts: data_point
365 .positive_bucket
366 .counts
367 .clone(),
368 },
369 negative_bucket: ExponentialBucket {
370 offset: data_point.negative_bucket.offset,
371 counts: data_point
372 .negative_bucket
373 .counts
374 .clone(),
375 },
376 exemplars: data_point.exemplars.clone(),
377 }],
378 }),
379 }],
380 }],
381 };
382 if let Err(e) = self.serialize_and_write(
383 &resource_metric,
384 &metric.name,
385 "ExponentialHistogram<u64>",
386 ) {
387 errors.push(e.to_string());
388 }
389 }
390 } else if let Some(exp_hist) =
391 data.downcast_ref::<data::ExponentialHistogram<f64>>()
392 {
393 for data_point in &exp_hist.data_points {
394 let resource_metric = ResourceMetrics {
395 resource: metrics.resource.clone(),
396 scope_metrics: vec![ScopeMetrics {
397 scope: scope_metric.scope.clone(),
398 metrics: vec![Metric {
399 name: metric.name.clone(),
400 description: metric.description.clone(),
401 unit: metric.unit.clone(),
402 data: Box::new(data::ExponentialHistogram {
403 temporality: exp_hist.temporality,
404 start_time: exp_hist.start_time,
405 time: exp_hist.time,
406 data_points: vec![ExponentialHistogramDataPoint {
407 attributes: data_point.attributes.clone(),
408 count: data_point.count,
409 min: data_point.min,
410 max: data_point.max,
411 sum: data_point.sum,
412 scale: data_point.scale,
413 zero_count: data_point.zero_count,
414 zero_threshold: data_point.zero_threshold,
415 positive_bucket: ExponentialBucket {
416 offset: data_point.positive_bucket.offset,
417 counts: data_point
418 .positive_bucket
419 .counts
420 .clone(),
421 },
422 negative_bucket: ExponentialBucket {
423 offset: data_point.negative_bucket.offset,
424 counts: data_point
425 .negative_bucket
426 .counts
427 .clone(),
428 },
429 exemplars: data_point.exemplars.clone(),
430 }],
431 }),
432 }],
433 }],
434 };
435 if let Err(e) = self.serialize_and_write(
436 &resource_metric,
437 &metric.name,
438 "ExponentialHistogram<f64>",
439 ) {
440 errors.push(e.to_string());
441 }
442 }
443 }
444 }
445 }
446
447 // Return any errors if present
448 if !errors.is_empty() {
449 let error_message = format!(
450 "Export encountered {} errors: [{}]",
451 errors.len(),
452 errors.join("; ")
453 );
454 return Err(OTelSdkError::InternalFailure(error_message));
455 }
456 }
457 Ok(())
458 }
459
460 fn temporality(&self) -> Temporality {
461 Temporality::Delta
462 }
463
464 async fn force_flush(&self) -> OTelSdkResult {
465 Ok(()) // In this implementation, flush does nothing
466 }
467
468 fn shutdown(&self) -> OTelSdkResult {
469 // TracepointState automatically unregisters when dropped
470 // https://github.com/microsoft/LinuxTracepoints-Rust/blob/main/eventheader/src/native.rs#L618
471 Ok(())
472 }
473}