opentelemetry_user_events_metrics/exporter/mod.rs
1use opentelemetry::{otel_debug, otel_warn};
2use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
3use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
4use opentelemetry_sdk::metrics::data;
5use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
6use opentelemetry_sdk::metrics::{
7 data::{
8 ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, ScopeMetrics,
9 },
10 Temporality,
11};
12
13use crate::tracepoint;
14use eventheader::_internal as ehi;
15use prost::Message;
16use std::fmt::{Debug, Formatter};
17use std::pin::Pin;
18
19const MAX_EVENT_SIZE: usize = 65360;
20
21pub struct MetricsExporter {
22 trace_point: Pin<Box<ehi::TracepointState>>,
23}
24
25impl MetricsExporter {
26 pub fn new() -> MetricsExporter {
27 let trace_point = Box::pin(ehi::TracepointState::new(0));
28 // This is unsafe because if the code is used in a shared object,
29 // the event MUST be unregistered before the shared object unloads.
30 unsafe {
31 let _result = tracepoint::register(trace_point.as_ref());
32 }
33 MetricsExporter { trace_point }
34 }
35}
36
37impl Default for MetricsExporter {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl Debug for MetricsExporter {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 f.write_str("user_events metrics exporter")
46 }
47}
48
49impl MetricsExporter {
50 fn serialize_and_write(
51 &self,
52 resource_metric: &ResourceMetrics,
53 metric_name: &str,
54 metric_type: &str,
55 ) -> OTelSdkResult {
56 // Allocate a local buffer for each write operation
57 // TODO: Investigate if this can be optimized to avoid reallocation or
58 // allocate a fixed buffer size for all writes
59 let mut byte_array = Vec::new();
60
61 // Convert to proto message
62 let proto_message: ExportMetricsServiceRequest = resource_metric.into();
63 otel_debug!(name: "SerializeStart",
64 metric_name = metric_name,
65 metric_type = metric_type);
66
67 // Encode directly into the buffer
68 match proto_message.encode(&mut byte_array) {
69 Ok(_) => {
70 otel_debug!(name: "SerializeSuccess",
71 metric_name = metric_name,
72 metric_type = metric_type,
73 size = byte_array.len());
74 }
75 Err(err) => {
76 otel_debug!(name: "SerializeFailed",
77 error = err.to_string(),
78 metric_name = metric_name,
79 metric_type = metric_type,
80 size = byte_array.len());
81 return Err(OTelSdkError::InternalFailure(err.to_string()));
82 }
83 }
84
85 // Check if the encoded message exceeds the 64 KB limit
86 if byte_array.len() > MAX_EVENT_SIZE {
87 otel_debug!(
88 name: "MaxEventSizeExceeded",
89 reason = format!("Encoded event size exceeds maximum allowed limit of {} bytes. Event will be dropped.", MAX_EVENT_SIZE),
90 metric_name = metric_name,
91 metric_type = metric_type,
92 size = byte_array.len()
93 );
94 return Err(OTelSdkError::InternalFailure(
95 "Event size exceeds maximum allowed limit".into(),
96 ));
97 }
98
99 // Write to the tracepoint
100 let result = tracepoint::write(&self.trace_point, &byte_array);
101 if result > 0 {
102 otel_debug!(name: "TracepointWrite", message = "Encoded data successfully written to tracepoint", size = byte_array.len(), metric_name = metric_name, metric_type = metric_type);
103 }
104
105 Ok(())
106 }
107}
108
109impl PushMetricExporter for MetricsExporter {
110 async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
111 otel_debug!(name: "ExportStart", message = "Starting metrics export");
112 if !self.trace_point.enabled() {
113 // TODO - This can flood the logs if the tracepoint is disabled for long periods of time
114 otel_warn!(name: "TracepointDisabled", message = "Tracepoint is disabled, skipping export");
115 return Ok(());
116 } else {
117 let mut errors = Vec::new();
118
119 for scope_metric in &metrics.scope_metrics {
120 for metric in &scope_metric.metrics {
121 let data = &metric.data.as_any();
122
123 if let Some(histogram) = data.downcast_ref::<data::Histogram<u64>>() {
124 for data_point in &histogram.data_points {
125 let resource_metric = ResourceMetrics {
126 resource: metrics.resource.clone(),
127 scope_metrics: vec![ScopeMetrics {
128 scope: scope_metric.scope.clone(),
129 metrics: vec![Metric {
130 name: metric.name.clone(),
131 description: metric.description.clone(),
132 unit: metric.unit.clone(),
133 data: Box::new(data::Histogram {
134 temporality: histogram.temporality,
135 start_time: histogram.start_time,
136 time: histogram.time,
137 data_points: vec![data_point.clone()],
138 }),
139 }],
140 }],
141 };
142 if let Err(e) = self.serialize_and_write(
143 &resource_metric,
144 &metric.name,
145 "Histogram<u64>",
146 ) {
147 errors.push(e.to_string());
148 }
149 }
150 } else if let Some(histogram) = data.downcast_ref::<data::Histogram<f64>>() {
151 for data_point in &histogram.data_points {
152 let resource_metric = ResourceMetrics {
153 resource: metrics.resource.clone(),
154 scope_metrics: vec![ScopeMetrics {
155 scope: scope_metric.scope.clone(),
156 metrics: vec![Metric {
157 name: metric.name.clone(),
158 description: metric.description.clone(),
159 unit: metric.unit.clone(),
160 data: Box::new(data::Histogram {
161 temporality: histogram.temporality,
162 start_time: histogram.start_time,
163 time: histogram.time,
164 data_points: vec![data_point.clone()],
165 }),
166 }],
167 }],
168 };
169 if let Err(e) = self.serialize_and_write(
170 &resource_metric,
171 &metric.name,
172 "Histogram<f64>",
173 ) {
174 errors.push(e.to_string());
175 }
176 }
177 } else if let Some(gauge) = data.downcast_ref::<data::Gauge<u64>>() {
178 for data_point in &gauge.data_points {
179 let resource_metric = ResourceMetrics {
180 resource: metrics.resource.clone(),
181 scope_metrics: vec![ScopeMetrics {
182 scope: scope_metric.scope.clone(),
183 metrics: vec![Metric {
184 name: metric.name.clone(),
185 description: metric.description.clone(),
186 unit: metric.unit.clone(),
187 data: Box::new(data::Gauge {
188 data_points: vec![data_point.clone()],
189 start_time: gauge.start_time,
190 time: gauge.time,
191 }),
192 }],
193 }],
194 };
195 if let Err(e) = self.serialize_and_write(
196 &resource_metric,
197 &metric.name,
198 "Gauge<u64>",
199 ) {
200 errors.push(e.to_string());
201 }
202 }
203 } else if let Some(gauge) = data.downcast_ref::<data::Gauge<i64>>() {
204 for data_point in &gauge.data_points {
205 let resource_metric = ResourceMetrics {
206 resource: metrics.resource.clone(),
207 scope_metrics: vec![ScopeMetrics {
208 scope: scope_metric.scope.clone(),
209 metrics: vec![Metric {
210 name: metric.name.clone(),
211 description: metric.description.clone(),
212 unit: metric.unit.clone(),
213 data: Box::new(data::Gauge {
214 data_points: vec![data_point.clone()],
215 start_time: gauge.start_time,
216 time: gauge.time,
217 }),
218 }],
219 }],
220 };
221 if let Err(e) = self.serialize_and_write(
222 &resource_metric,
223 &metric.name,
224 "Gauge<i64>",
225 ) {
226 errors.push(e.to_string());
227 }
228 }
229 } else if let Some(gauge) = data.downcast_ref::<data::Gauge<f64>>() {
230 for data_point in &gauge.data_points {
231 let resource_metric = ResourceMetrics {
232 resource: metrics.resource.clone(),
233 scope_metrics: vec![ScopeMetrics {
234 scope: scope_metric.scope.clone(),
235 metrics: vec![Metric {
236 name: metric.name.clone(),
237 description: metric.description.clone(),
238 unit: metric.unit.clone(),
239 data: Box::new(data::Gauge {
240 data_points: vec![data_point.clone()],
241 start_time: gauge.start_time,
242 time: gauge.time,
243 }),
244 }],
245 }],
246 };
247 if let Err(e) = self.serialize_and_write(
248 &resource_metric,
249 &metric.name,
250 "Gauge<f64>",
251 ) {
252 errors.push(e.to_string());
253 }
254 }
255 } else if let Some(sum) = data.downcast_ref::<data::Sum<u64>>() {
256 for data_point in &sum.data_points {
257 let resource_metric = ResourceMetrics {
258 resource: metrics.resource.clone(),
259 scope_metrics: vec![ScopeMetrics {
260 scope: scope_metric.scope.clone(),
261 metrics: vec![Metric {
262 name: metric.name.clone(),
263 description: metric.description.clone(),
264 unit: metric.unit.clone(),
265 data: Box::new(data::Sum {
266 temporality: sum.temporality,
267 data_points: vec![data_point.clone()],
268 is_monotonic: sum.is_monotonic,
269 start_time: sum.start_time,
270 time: sum.time,
271 }),
272 }],
273 }],
274 };
275 if let Err(e) =
276 self.serialize_and_write(&resource_metric, &metric.name, "Sum<u64>")
277 {
278 errors.push(e.to_string());
279 }
280 }
281 } else if let Some(sum) = data.downcast_ref::<data::Sum<i64>>() {
282 for data_point in &sum.data_points {
283 let resource_metric = ResourceMetrics {
284 resource: metrics.resource.clone(),
285 scope_metrics: vec![ScopeMetrics {
286 scope: scope_metric.scope.clone(),
287 metrics: vec![Metric {
288 name: metric.name.clone(),
289 description: metric.description.clone(),
290 unit: metric.unit.clone(),
291 data: Box::new(data::Sum {
292 temporality: sum.temporality,
293 data_points: vec![data_point.clone()],
294 is_monotonic: sum.is_monotonic,
295 start_time: sum.start_time,
296 time: sum.time,
297 }),
298 }],
299 }],
300 };
301 if let Err(e) =
302 self.serialize_and_write(&resource_metric, &metric.name, "Sum<i64>")
303 {
304 errors.push(e.to_string());
305 }
306 }
307 } else if let Some(sum) = data.downcast_ref::<data::Sum<f64>>() {
308 for data_point in &sum.data_points {
309 let resource_metric = ResourceMetrics {
310 resource: metrics.resource.clone(),
311 scope_metrics: vec![ScopeMetrics {
312 scope: scope_metric.scope.clone(),
313 metrics: vec![Metric {
314 name: metric.name.clone(),
315 description: metric.description.clone(),
316 unit: metric.unit.clone(),
317 data: Box::new(data::Sum {
318 temporality: sum.temporality,
319 data_points: vec![data_point.clone()],
320 is_monotonic: sum.is_monotonic,
321 start_time: sum.start_time,
322 time: sum.time,
323 }),
324 }],
325 }],
326 };
327 if let Err(e) =
328 self.serialize_and_write(&resource_metric, &metric.name, "Sum<f64>")
329 {
330 errors.push(e.to_string());
331 }
332 }
333 } else if let Some(exp_hist) =
334 data.downcast_ref::<data::ExponentialHistogram<u64>>()
335 {
336 for data_point in &exp_hist.data_points {
337 let resource_metric = ResourceMetrics {
338 resource: metrics.resource.clone(),
339 scope_metrics: vec![ScopeMetrics {
340 scope: scope_metric.scope.clone(),
341 metrics: vec![Metric {
342 name: metric.name.clone(),
343 description: metric.description.clone(),
344 unit: metric.unit.clone(),
345 data: Box::new(data::ExponentialHistogram {
346 temporality: exp_hist.temporality,
347 start_time: exp_hist.start_time,
348 time: exp_hist.time,
349 data_points: vec![ExponentialHistogramDataPoint {
350 attributes: data_point.attributes.clone(),
351 count: data_point.count,
352 min: data_point.min,
353 max: data_point.max,
354 sum: data_point.sum,
355 scale: data_point.scale,
356 zero_count: data_point.zero_count,
357 zero_threshold: data_point.zero_threshold,
358 positive_bucket: ExponentialBucket {
359 offset: data_point.positive_bucket.offset,
360 counts: data_point
361 .positive_bucket
362 .counts
363 .clone(),
364 },
365 negative_bucket: ExponentialBucket {
366 offset: data_point.negative_bucket.offset,
367 counts: data_point
368 .negative_bucket
369 .counts
370 .clone(),
371 },
372 exemplars: data_point.exemplars.clone(),
373 }],
374 }),
375 }],
376 }],
377 };
378 if let Err(e) = self.serialize_and_write(
379 &resource_metric,
380 &metric.name,
381 "ExponentialHistogram<u64>",
382 ) {
383 errors.push(e.to_string());
384 }
385 }
386 } else if let Some(exp_hist) =
387 data.downcast_ref::<data::ExponentialHistogram<f64>>()
388 {
389 for data_point in &exp_hist.data_points {
390 let resource_metric = ResourceMetrics {
391 resource: metrics.resource.clone(),
392 scope_metrics: vec![ScopeMetrics {
393 scope: scope_metric.scope.clone(),
394 metrics: vec![Metric {
395 name: metric.name.clone(),
396 description: metric.description.clone(),
397 unit: metric.unit.clone(),
398 data: Box::new(data::ExponentialHistogram {
399 temporality: exp_hist.temporality,
400 start_time: exp_hist.start_time,
401 time: exp_hist.time,
402 data_points: vec![ExponentialHistogramDataPoint {
403 attributes: data_point.attributes.clone(),
404 count: data_point.count,
405 min: data_point.min,
406 max: data_point.max,
407 sum: data_point.sum,
408 scale: data_point.scale,
409 zero_count: data_point.zero_count,
410 zero_threshold: data_point.zero_threshold,
411 positive_bucket: ExponentialBucket {
412 offset: data_point.positive_bucket.offset,
413 counts: data_point
414 .positive_bucket
415 .counts
416 .clone(),
417 },
418 negative_bucket: ExponentialBucket {
419 offset: data_point.negative_bucket.offset,
420 counts: data_point
421 .negative_bucket
422 .counts
423 .clone(),
424 },
425 exemplars: data_point.exemplars.clone(),
426 }],
427 }),
428 }],
429 }],
430 };
431 if let Err(e) = self.serialize_and_write(
432 &resource_metric,
433 &metric.name,
434 "ExponentialHistogram<f64>",
435 ) {
436 errors.push(e.to_string());
437 }
438 }
439 }
440 }
441 }
442
443 // Return any errors if present
444 if !errors.is_empty() {
445 let error_message = format!(
446 "Export encountered {} errors: [{}]",
447 errors.len(),
448 errors.join("; ")
449 );
450 return Err(OTelSdkError::InternalFailure(error_message));
451 }
452 }
453 Ok(())
454 }
455
456 fn temporality(&self) -> Temporality {
457 Temporality::Delta
458 }
459
460 fn force_flush(&self) -> OTelSdkResult {
461 Ok(()) // In this implementation, flush does nothing
462 }
463
464 fn shutdown(&self) -> OTelSdkResult {
465 // TracepointState automatically deregisters when dropped
466 // https://github.com/microsoft/LinuxTracepoints-Rust/blob/main/eventheader/src/native.rs#L618
467 Ok(())
468 }
469}