opentelemetry_user_events_metrics/exporter/
mod.rs

1use opentelemetry::{otel_debug, otel_info};
2use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
3use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
4use opentelemetry_sdk::metrics::data::AggregatedMetrics;
5use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
6use opentelemetry_sdk::metrics::{
7    data::{MetricData, ResourceMetrics},
8    Temporality,
9};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use crate::tracepoint;
13use eventheader::_internal as ehi;
14use prost::Message;
15use std::fmt::{Debug, Formatter};
16use std::pin::Pin;
17
18const MAX_EVENT_SIZE: usize = 65360;
19
20trait Numeric: Copy {
21    // lossy at large values for u64 and i64 but otlp histograms only handle float values
22    fn into_f64(self) -> f64;
23    fn into_number_data_point_value(
24        self,
25    ) -> opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
26}
27
28impl Numeric for u64 {
29    fn into_f64(self) -> f64 {
30        self as f64
31    }
32
33    fn into_number_data_point_value(
34        self,
35    ) -> opentelemetry_proto::tonic::metrics::v1::number_data_point::Value {
36        opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(self as i64)
37    }
38}
39
40impl Numeric for i64 {
41    fn into_f64(self) -> f64 {
42        self as f64
43    }
44
45    fn into_number_data_point_value(
46        self,
47    ) -> opentelemetry_proto::tonic::metrics::v1::number_data_point::Value {
48        opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(self)
49    }
50}
51
52impl Numeric for f64 {
53    fn into_f64(self) -> f64 {
54        self
55    }
56
57    fn into_number_data_point_value(
58        self,
59    ) -> opentelemetry_proto::tonic::metrics::v1::number_data_point::Value {
60        opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsDouble(self)
61    }
62}
63
64pub struct MetricsExporter {
65    trace_point: Pin<Box<ehi::TracepointState>>,
66}
67
68impl MetricsExporter {
69    pub fn new() -> MetricsExporter {
70        let trace_point = Box::pin(ehi::TracepointState::new(0));
71        // This is unsafe because if the code is used in a shared object,
72        // the event MUST be unregistered before the shared object unloads.
73        unsafe {
74            let _result = tracepoint::register(trace_point.as_ref());
75        }
76        MetricsExporter { trace_point }
77    }
78}
79
80impl Default for MetricsExporter {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Debug for MetricsExporter {
87    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
88        f.write_str("user_events metrics exporter")
89    }
90}
91
92fn to_nanos(time: SystemTime) -> u64 {
93    time.duration_since(UNIX_EPOCH)
94        .unwrap_or_else(|_| Duration::from_secs(0))
95        .as_nanos() as u64
96}
97
98impl MetricsExporter {
99    fn process_numeric_metrics<T: Numeric>(
100        &self,
101        export_metric_service_request_common: &mut ExportMetricsServiceRequest,
102        byte_array: &mut Vec<u8>,
103        metric: &opentelemetry_sdk::metrics::data::Metric,
104        data: &MetricData<T>,
105    ) -> usize {
106        match data {
107            MetricData::Gauge(gauge) => self.process_gauge(
108                export_metric_service_request_common,
109                byte_array,
110                metric,
111                gauge,
112            ),
113            MetricData::Sum(sum) => self.process_sum(
114                export_metric_service_request_common,
115                byte_array,
116                metric,
117                sum,
118            ),
119            MetricData::Histogram(hist) => self.process_histogram(
120                export_metric_service_request_common,
121                byte_array,
122                metric,
123                hist,
124            ),
125            MetricData::ExponentialHistogram(hist) => self.process_exponential_histogram(
126                export_metric_service_request_common,
127                byte_array,
128                metric,
129                hist,
130            ),
131        }
132    }
133
134    fn process_gauge<T: Numeric>(
135        &self,
136        export_metric_service_request_common: &mut ExportMetricsServiceRequest,
137        byte_array: &mut Vec<u8>,
138        metric: &opentelemetry_sdk::metrics::data::Metric,
139        gauge: &opentelemetry_sdk::metrics::data::Gauge<T>,
140    ) -> usize {
141        // Store and reuse common values for all data points in this gauge
142        let gauge_start_time = gauge.start_time().map(to_nanos).unwrap_or_default();
143        let gauge_time = to_nanos(gauge.time());
144        let default_flags =
145            opentelemetry_proto::tonic::metrics::v1::DataPointFlags::default() as u32;
146
147        // Create metric_proto template outside the loop to reuse name, description, unit
148        let metric_proto = opentelemetry_proto::tonic::metrics::v1::Metric {
149            name: metric.name().to_string(),
150            description: metric.description().to_string(),
151            unit: metric.unit().to_string(),
152            metadata: vec![],
153            data: None,
154        };
155
156        export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics =
157            vec![metric_proto];
158
159        let mut failed_count = 0;
160
161        // ═══════════════════════════════════════════════════════════════════════════════════
162        // LOOP 3: METRIC → DATA POINTS (Individual measurements)
163        // ═══════════════════════════════════════════════════════════════════════════════════
164        // Iterate through each data point within this gauge metric.
165        // Each data point = unique combination of (metric + attributes + timestamp + value).
166        // SERIALIZATION: Each data point is individually encoded and emitted to tracepoint.
167        for dp in gauge.data_points() {
168            let number_data_point = opentelemetry_proto::tonic::metrics::v1::NumberDataPoint {
169                attributes: dp.attributes().map(Into::into).collect(),
170                start_time_unix_nano: gauge_start_time,
171                time_unix_nano: gauge_time,
172                exemplars: Vec::new(), // No support for exemplars
173                flags: default_flags,
174                value: Some(dp.value().into_number_data_point_value()),
175            };
176
177            let gauge_point_proto = opentelemetry_proto::tonic::metrics::v1::Gauge {
178                data_points: vec![number_data_point],
179            };
180
181            // Update the data field for this data point
182            export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics[0]
183                .data = Some(
184                opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge_point_proto),
185            );
186
187            byte_array.clear(); // Clear contents but retain capacity for performance
188            if self
189                .encode_and_emit_metric(export_metric_service_request_common, byte_array, metric)
190                .is_err()
191            {
192                failed_count += 1;
193            }
194        }
195        failed_count
196    }
197
198    fn process_sum<T: Numeric>(
199        &self,
200        export_metric_service_request_common: &mut ExportMetricsServiceRequest,
201        byte_array: &mut Vec<u8>,
202        metric: &opentelemetry_sdk::metrics::data::Metric,
203        sum: &opentelemetry_sdk::metrics::data::Sum<T>,
204    ) -> usize {
205        // Pre-compute common values for all data points in this sum
206        let sum_start_time = to_nanos(sum.start_time());
207        let sum_time = to_nanos(sum.time());
208        let sum_is_monotonic = sum.is_monotonic();
209        let default_flags =
210            opentelemetry_proto::tonic::metrics::v1::DataPointFlags::default() as u32;
211
212        // Create metric_proto template outside the loop to reuse name, description, unit
213        let metric_proto = opentelemetry_proto::tonic::metrics::v1::Metric {
214            name: metric.name().to_string(),
215            description: metric.description().to_string(),
216            unit: metric.unit().to_string(),
217            metadata: vec![],
218            data: None,
219        };
220
221        export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics =
222            vec![metric_proto];
223
224        let mut failed_count = 0;
225
226        // ═══════════════════════════════════════════════════════════════════════════════════
227        // LOOP 3: METRIC → DATA POINTS (Individual measurements)
228        // ═══════════════════════════════════════════════════════════════════════════════════
229        // Iterate through each data point within this sum metric.
230        // Each data point = unique combination of (metric + attributes + timestamp + value).
231        // SERIALIZATION: Each data point is individually encoded and emitted to tracepoint.
232        for dp in sum.data_points() {
233            let number_data_point = opentelemetry_proto::tonic::metrics::v1::NumberDataPoint {
234                attributes: dp.attributes().map(Into::into).collect(),
235                start_time_unix_nano: sum_start_time,
236                time_unix_nano: sum_time,
237                exemplars: Vec::new(), // No support for exemplars
238                flags: default_flags,
239                value: Some(dp.value().into_number_data_point_value()),
240            };
241
242            let sum_point_proto = opentelemetry_proto::tonic::metrics::v1::Sum {
243                aggregation_temporality: sum.temporality() as i32,
244                is_monotonic: sum_is_monotonic,
245                data_points: vec![number_data_point],
246            };
247
248            // Update the data field for this data point
249            export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics[0]
250                .data = Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Sum(
251                sum_point_proto,
252            ));
253
254            byte_array.clear(); // Clear contents but retain capacity for performance
255            if self
256                .encode_and_emit_metric(export_metric_service_request_common, byte_array, metric)
257                .is_err()
258            {
259                failed_count += 1;
260            }
261        }
262        failed_count
263    }
264
265    fn process_histogram<T: Numeric>(
266        &self,
267        export_metric_service_request_common: &mut ExportMetricsServiceRequest,
268        byte_array: &mut Vec<u8>,
269        metric: &opentelemetry_sdk::metrics::data::Metric,
270        hist: &opentelemetry_sdk::metrics::data::Histogram<T>,
271    ) -> usize {
272        // Pre-compute common values for all data points in this histogram
273        let hist_start_time = to_nanos(hist.start_time());
274        let hist_time = to_nanos(hist.time());
275        let default_flags =
276            opentelemetry_proto::tonic::metrics::v1::DataPointFlags::default() as u32;
277
278        // Create metric_proto template outside the loop to reuse name, description, unit
279        let metric_proto = opentelemetry_proto::tonic::metrics::v1::Metric {
280            name: metric.name().to_string(),
281            description: metric.description().to_string(),
282            unit: metric.unit().to_string(),
283            metadata: vec![],
284            data: None,
285        };
286
287        export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics =
288            vec![metric_proto];
289
290        let mut failed_count = 0;
291
292        // ═══════════════════════════════════════════════════════════════════════════════════
293        // LOOP 3: METRIC → DATA POINTS (Individual measurements)
294        // ═══════════════════════════════════════════════════════════════════════════════════
295        // Iterate through each data point within this histogram metric.
296        // Each data point = unique combination of (metric + attributes + timestamp + buckets).
297        // SERIALIZATION: Each data point is individually encoded and emitted to tracepoint.
298        for dp in hist.data_points() {
299            let histogram_data_point =
300                opentelemetry_proto::tonic::metrics::v1::HistogramDataPoint {
301                    attributes: dp.attributes().map(Into::into).collect(),
302                    start_time_unix_nano: hist_start_time,
303                    time_unix_nano: hist_time,
304                    count: dp.count(),
305                    sum: Some(dp.sum().into_f64()),
306                    bucket_counts: dp.bucket_counts().collect(),
307                    explicit_bounds: dp.bounds().collect(),
308                    exemplars: Vec::new(), // No support for exemplars
309                    flags: default_flags,
310                    min: dp.min().map(|v| v.into_f64()),
311                    max: dp.max().map(|v| v.into_f64()),
312                };
313
314            let histogram_point_proto = opentelemetry_proto::tonic::metrics::v1::Histogram {
315                aggregation_temporality: hist.temporality() as i32,
316                data_points: vec![histogram_data_point],
317            };
318
319            // Update the data field for this data point
320            export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics[0]
321                .data = Some(
322                opentelemetry_proto::tonic::metrics::v1::metric::Data::Histogram(
323                    histogram_point_proto,
324                ),
325            );
326
327            byte_array.clear(); // Clear contents but retain capacity for performance
328            if self
329                .encode_and_emit_metric(export_metric_service_request_common, byte_array, metric)
330                .is_err()
331            {
332                failed_count += 1;
333            }
334        }
335        failed_count
336    }
337
338    fn process_exponential_histogram<T: Numeric>(
339        &self,
340        export_metric_service_request_common: &mut ExportMetricsServiceRequest,
341        byte_array: &mut Vec<u8>,
342        metric: &opentelemetry_sdk::metrics::data::Metric,
343        hist: &opentelemetry_sdk::metrics::data::ExponentialHistogram<T>,
344    ) -> usize {
345        // Pre-compute common values for all data points in this histogram
346        let hist_start_time = to_nanos(hist.start_time());
347        let hist_time = to_nanos(hist.time());
348        let default_flags =
349            opentelemetry_proto::tonic::metrics::v1::DataPointFlags::default() as u32;
350
351        // Create metric_proto template outside the loop to reuse name, description, unit
352        let metric_proto = opentelemetry_proto::tonic::metrics::v1::Metric {
353            name: metric.name().to_string(),
354            description: metric.description().to_string(),
355            unit: metric.unit().to_string(),
356            metadata: vec![],
357            data: None,
358        };
359
360        export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics =
361            vec![metric_proto];
362
363        let mut failed_count = 0;
364
365        // ═══════════════════════════════════════════════════════════════════════════════════
366        // LOOP 3: METRIC → DATA POINTS (Individual measurements)
367        // ═══════════════════════════════════════════════════════════════════════════════════
368        // Iterate through each data point within this exponential histogram metric.
369        // Each data point = unique combination of (metric + attributes + timestamp + buckets).
370        // SERIALIZATION: Each data point is individually encoded and emitted to tracepoint.
371        for dp in hist.data_points() {
372            let histogram_data_point = opentelemetry_proto::tonic::metrics::v1::ExponentialHistogramDataPoint {
373                attributes: dp.attributes().map(Into::into).collect(),
374                start_time_unix_nano: hist_start_time,
375                time_unix_nano: hist_time,
376                count: dp.count() as u64,
377                sum: Some(dp.sum().into_f64()),
378                scale: dp.scale().into(),
379                zero_count: dp.zero_count(),
380                positive: Some(opentelemetry_proto::tonic::metrics::v1::exponential_histogram_data_point::Buckets {
381                    offset: dp.positive_bucket().offset(),
382                    bucket_counts: dp.positive_bucket().counts().collect(),
383                }),
384                negative: Some(opentelemetry_proto::tonic::metrics::v1::exponential_histogram_data_point::Buckets {
385                    offset: dp.negative_bucket().offset(),
386                    bucket_counts: dp.negative_bucket().counts().collect(),
387                }),
388                exemplars: Vec::new(), // No support for exemplars
389                flags: default_flags,
390                min: dp.min().map(|v| v.into_f64()),
391                max: dp.max().map(|v| v.into_f64()),
392                zero_threshold: dp.zero_threshold(),
393            };
394
395            let histogram_point_proto =
396                opentelemetry_proto::tonic::metrics::v1::ExponentialHistogram {
397                    aggregation_temporality: hist.temporality() as i32,
398                    data_points: vec![histogram_data_point],
399                };
400
401            // Update the data field for this data point
402            export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics[0]
403                .data = Some(
404                opentelemetry_proto::tonic::metrics::v1::metric::Data::ExponentialHistogram(
405                    histogram_point_proto,
406                ),
407            );
408
409            byte_array.clear(); // Clear contents but retain capacity for performance
410            if self
411                .encode_and_emit_metric(export_metric_service_request_common, byte_array, metric)
412                .is_err()
413            {
414                failed_count += 1;
415            }
416        }
417        failed_count
418    }
419
420    fn encode_and_emit_metric(
421        &self,
422        export_metric_service_request_common: &ExportMetricsServiceRequest,
423        byte_array: &mut Vec<u8>,
424        metric: &opentelemetry_sdk::metrics::data::Metric,
425    ) -> Result<(), String> {
426        match export_metric_service_request_common.encode(byte_array) {
427            Ok(_) => {
428                otel_debug!(name: "SerializationSucceeded", 
429                    metric_name = metric.name(),
430                    size = byte_array.len());
431
432                if byte_array.len() > MAX_EVENT_SIZE {
433                    let error_msg = format!("Encoded event size exceeds maximum allowed limit of {MAX_EVENT_SIZE} bytes. Event will be dropped.");
434                    otel_debug!(
435                        name: "EventSizeExceeded",
436                        reason = &error_msg,
437                        metric_name = metric.name(),
438                        size = byte_array.len()
439                    );
440                    Err(error_msg)
441                } else {
442                    // Write to the tracepoint
443                    let result = tracepoint::write(&self.trace_point, byte_array);
444                    if result == 0 {
445                        otel_debug!(name: "TracepointWriteSucceeded", message = "Encoded data successfully written to tracepoint", size = byte_array.len(), metric_name = metric.name());
446                        Ok(())
447                    } else {
448                        let error_msg = "Failed to write to tracepoint".to_string();
449                        otel_debug!(name: "TracepointWriteFailed", message = &error_msg, metric_name = metric.name(), result = result);
450                        Err(error_msg)
451                    }
452                }
453            }
454            Err(err) => {
455                let error_msg = format!("Serialization failed: {err}");
456                otel_debug!(name: "SerializationFailed",
457                    error = &error_msg,
458                    metric_name = metric.name(),
459                    size = byte_array.len());
460                Err(error_msg)
461            }
462        }
463    }
464
465    fn export_resource_metrics(&self, resource_metric: &ResourceMetrics) -> OTelSdkResult {
466        // Custom transformation to protobuf structs is used instead of upstream
467        // transforms because tracepoint has a 64kB size limit. Encoding each
468        // data point separately ensures we stay within this limit and avoid
469        // data loss. Some upstream transforms are reused where appropriate for
470        // consistency. TODO: Optimize by batching multiple data points until
471        // the size limit is reached, rather than writing one data point at a
472        // time.
473
474        // OVERALL EXPORT FLOW: This method implements a 3-level nested loop
475        // structure:
476        //
477        // LOOP 1: Resource → Scopes (Instrumentation Libraries)
478        //   - Iterate through each scope/instrumentation library in the
479        //     resource
480        //   - Each scope contains multiple metrics from the same library
481        //
482        // LOOP 2: Scope → Metrics
483        //   - For each scope, iterate through all metrics (counters, gauges,
484        //     histograms)
485        //   - Each metric contains multiple data points with different
486        //     attribute combinations
487        //
488        // LOOP 3: Metric → Data Points
489        //   - For each metric, iterate through individual data points
490        //   - Each data point represents a unique combination of metric +
491        //     attributes + timestamp
492        //   - SERIALIZATION HAPPENS HERE: Each data point is encoded and
493        //     emitted individually
494        //
495        // PERFORMANCE OPTIMIZATIONS:
496        // - Reuse protobuf structure templates at each level to avoid repeated
497        //   allocations
498        // - Single byte_array is reused throughout: cleared between uses but
499        //   capacity retained
500        // - Common values (timestamps, flags, metadata) pre-computed (but not
501        //   pre-serialized) per metric to avoid duplication
502        // - Only the innermost data varies between serializations, parent
503        //   structures stay constant
504        let mut byte_array = Vec::new();
505        let mut has_failures = false;
506        let mut export_metric_service_request_common = ExportMetricsServiceRequest {
507            resource_metrics: vec![opentelemetry_proto::tonic::metrics::v1::ResourceMetrics {
508                resource: Some((resource_metric.resource()).into()),
509                scope_metrics: vec![],
510                schema_url: resource_metric
511                    .resource()
512                    .schema_url()
513                    .unwrap_or_default()
514                    .to_string(),
515            }],
516        };
517
518        // ═══════════════════════════════════════════════════════════════════════════════════
519        // LOOP 1: RESOURCE → SCOPES (Instrumentation Libraries)
520        // ═══════════════════════════════════════════════════════════════════════════════════
521        // Iterate through each scope (instrumentation library) within this resource.
522        // Each scope groups metrics that originate from the same library/component.
523        for scope_metric in resource_metric.scope_metrics() {
524            // Create reusable scope_metric_proto template with empty metrics
525            let scope_metric_proto = opentelemetry_proto::tonic::metrics::v1::ScopeMetrics {
526                scope: Some((scope_metric.scope(), None).into()),
527                metrics: vec![],
528                schema_url: scope_metric
529                    .scope()
530                    .schema_url()
531                    .unwrap_or_default()
532                    .to_string(),
533            };
534
535            export_metric_service_request_common.resource_metrics[0].scope_metrics =
536                vec![scope_metric_proto];
537
538            // ═══════════════════════════════════════════════════════════════════════════════════
539            // LOOP 2: SCOPE → METRICS (Counters, Gauges, Histograms, etc.)
540            // ═══════════════════════════════════════════════════════════════════════════════════
541            // For each scope, iterate through all metrics of different types.
542            // Each metric will be processed by type-specific handlers that implement LOOP 3.
543            for metric in scope_metric.metrics() {
544                let failed_count = match metric.data() {
545                    AggregatedMetrics::F64(data) => {
546                        // → DELEGATES TO LOOP 3: process_* methods iterate through data points
547                        // → SERIALIZATION: Each data point encoded & emitted individually
548                        // → REUSE: Same byte_array cleared & reused for performance
549                        self.process_numeric_metrics(
550                            &mut export_metric_service_request_common,
551                            &mut byte_array,
552                            metric,
553                            data,
554                        )
555                    }
556                    AggregatedMetrics::U64(data) => self.process_numeric_metrics(
557                        &mut export_metric_service_request_common,
558                        &mut byte_array,
559                        metric,
560                        data,
561                    ),
562                    AggregatedMetrics::I64(data) => self.process_numeric_metrics(
563                        &mut export_metric_service_request_common,
564                        &mut byte_array,
565                        metric,
566                        data,
567                    ),
568                };
569
570                // Log failure counts if any data points failed to export
571                if failed_count > 0 {
572                    has_failures = true;
573                }
574            }
575        }
576
577        // Even a single failure in the export process is considered a failure of overall export
578        // The debug level logs will show exactly which metrics failed
579        if has_failures {
580            Err(OTelSdkError::InternalFailure(
581                "Failed to export some metrics due to serialization or tracepoint write errors"
582                    .to_string(),
583            ))
584        } else {
585            Ok(())
586        }
587    }
588}
589
590impl PushMetricExporter for MetricsExporter {
591    async fn export(&self, resource_metrics: &ResourceMetrics) -> OTelSdkResult {
592        otel_debug!(name: "ExportStarted", message = "Starting metrics export");
593        if !self.trace_point.enabled() {
594            // TODO - This can flood the logs if the tracepoint is disabled for long periods of time
595            otel_info!(name: "TracepointDisabled", message = "Tracepoint is disabled, skipping export");
596            Ok(())
597        } else {
598            self.export_resource_metrics(resource_metrics)
599        }
600    }
601
602    fn temporality(&self) -> Temporality {
603        Temporality::Delta
604    }
605
606    fn force_flush(&self) -> OTelSdkResult {
607        Ok(()) // In this implementation, flush does nothing
608    }
609
610    fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
611        // TracepointState automatically deregisters when dropped
612        // https://github.com/microsoft/LinuxTracepoints-Rust/blob/main/eventheader/src/native.rs#L618
613        Ok(())
614    }
615
616    fn shutdown(&self) -> OTelSdkResult {
617        self.shutdown_with_timeout(Duration::from_secs(5))
618    }
619}