1use opentelemetry::{otel_debug, otel_info};
2use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
3use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
4use opentelemetry_sdk::metrics::data::AggregatedMetrics;
5use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
6use opentelemetry_sdk::metrics::{
7 data::{MetricData, ResourceMetrics},
8 Temporality,
9};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use crate::tracepoint;
13use eventheader::_internal as ehi;
14use prost::Message;
15use std::fmt::{Debug, Formatter};
16use std::pin::Pin;
17
18const MAX_EVENT_SIZE: usize = 65360;
19
20trait Numeric: Copy {
21 fn into_f64(self) -> f64;
23 fn into_number_data_point_value(
24 self,
25 ) -> opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
26}
27
28impl Numeric for u64 {
29 fn into_f64(self) -> f64 {
30 self as f64
31 }
32
33 fn into_number_data_point_value(
34 self,
35 ) -> opentelemetry_proto::tonic::metrics::v1::number_data_point::Value {
36 opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(self as i64)
37 }
38}
39
40impl Numeric for i64 {
41 fn into_f64(self) -> f64 {
42 self as f64
43 }
44
45 fn into_number_data_point_value(
46 self,
47 ) -> opentelemetry_proto::tonic::metrics::v1::number_data_point::Value {
48 opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(self)
49 }
50}
51
52impl Numeric for f64 {
53 fn into_f64(self) -> f64 {
54 self
55 }
56
57 fn into_number_data_point_value(
58 self,
59 ) -> opentelemetry_proto::tonic::metrics::v1::number_data_point::Value {
60 opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsDouble(self)
61 }
62}
63
64pub struct MetricsExporter {
65 trace_point: Pin<Box<ehi::TracepointState>>,
66}
67
68impl MetricsExporter {
69 pub fn new() -> MetricsExporter {
70 let trace_point = Box::pin(ehi::TracepointState::new(0));
71 unsafe {
74 let _result = tracepoint::register(trace_point.as_ref());
75 }
76 MetricsExporter { trace_point }
77 }
78}
79
80impl Default for MetricsExporter {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Debug for MetricsExporter {
87 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
88 f.write_str("user_events metrics exporter")
89 }
90}
91
92fn to_nanos(time: SystemTime) -> u64 {
93 time.duration_since(UNIX_EPOCH)
94 .unwrap_or_else(|_| Duration::from_secs(0))
95 .as_nanos() as u64
96}
97
98impl MetricsExporter {
99 fn process_numeric_metrics<T: Numeric>(
100 &self,
101 export_metric_service_request_common: &mut ExportMetricsServiceRequest,
102 byte_array: &mut Vec<u8>,
103 metric: &opentelemetry_sdk::metrics::data::Metric,
104 data: &MetricData<T>,
105 ) -> usize {
106 match data {
107 MetricData::Gauge(gauge) => self.process_gauge(
108 export_metric_service_request_common,
109 byte_array,
110 metric,
111 gauge,
112 ),
113 MetricData::Sum(sum) => self.process_sum(
114 export_metric_service_request_common,
115 byte_array,
116 metric,
117 sum,
118 ),
119 MetricData::Histogram(hist) => self.process_histogram(
120 export_metric_service_request_common,
121 byte_array,
122 metric,
123 hist,
124 ),
125 MetricData::ExponentialHistogram(hist) => self.process_exponential_histogram(
126 export_metric_service_request_common,
127 byte_array,
128 metric,
129 hist,
130 ),
131 }
132 }
133
134 fn process_gauge<T: Numeric>(
135 &self,
136 export_metric_service_request_common: &mut ExportMetricsServiceRequest,
137 byte_array: &mut Vec<u8>,
138 metric: &opentelemetry_sdk::metrics::data::Metric,
139 gauge: &opentelemetry_sdk::metrics::data::Gauge<T>,
140 ) -> usize {
141 let gauge_start_time = gauge.start_time().map(to_nanos).unwrap_or_default();
143 let gauge_time = to_nanos(gauge.time());
144 let default_flags =
145 opentelemetry_proto::tonic::metrics::v1::DataPointFlags::default() as u32;
146
147 let metric_proto = opentelemetry_proto::tonic::metrics::v1::Metric {
149 name: metric.name().to_string(),
150 description: metric.description().to_string(),
151 unit: metric.unit().to_string(),
152 metadata: vec![],
153 data: None,
154 };
155
156 export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics =
157 vec![metric_proto];
158
159 let mut failed_count = 0;
160
161 for dp in gauge.data_points() {
168 let number_data_point = opentelemetry_proto::tonic::metrics::v1::NumberDataPoint {
169 attributes: dp.attributes().map(Into::into).collect(),
170 start_time_unix_nano: gauge_start_time,
171 time_unix_nano: gauge_time,
172 exemplars: Vec::new(), flags: default_flags,
174 value: Some(dp.value().into_number_data_point_value()),
175 };
176
177 let gauge_point_proto = opentelemetry_proto::tonic::metrics::v1::Gauge {
178 data_points: vec![number_data_point],
179 };
180
181 export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics[0]
183 .data = Some(
184 opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge_point_proto),
185 );
186
187 byte_array.clear(); if self
189 .encode_and_emit_metric(export_metric_service_request_common, byte_array, metric)
190 .is_err()
191 {
192 failed_count += 1;
193 }
194 }
195 failed_count
196 }
197
198 fn process_sum<T: Numeric>(
199 &self,
200 export_metric_service_request_common: &mut ExportMetricsServiceRequest,
201 byte_array: &mut Vec<u8>,
202 metric: &opentelemetry_sdk::metrics::data::Metric,
203 sum: &opentelemetry_sdk::metrics::data::Sum<T>,
204 ) -> usize {
205 let sum_start_time = to_nanos(sum.start_time());
207 let sum_time = to_nanos(sum.time());
208 let sum_is_monotonic = sum.is_monotonic();
209 let default_flags =
210 opentelemetry_proto::tonic::metrics::v1::DataPointFlags::default() as u32;
211
212 let metric_proto = opentelemetry_proto::tonic::metrics::v1::Metric {
214 name: metric.name().to_string(),
215 description: metric.description().to_string(),
216 unit: metric.unit().to_string(),
217 metadata: vec![],
218 data: None,
219 };
220
221 export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics =
222 vec![metric_proto];
223
224 let mut failed_count = 0;
225
226 for dp in sum.data_points() {
233 let number_data_point = opentelemetry_proto::tonic::metrics::v1::NumberDataPoint {
234 attributes: dp.attributes().map(Into::into).collect(),
235 start_time_unix_nano: sum_start_time,
236 time_unix_nano: sum_time,
237 exemplars: Vec::new(), flags: default_flags,
239 value: Some(dp.value().into_number_data_point_value()),
240 };
241
242 let sum_point_proto = opentelemetry_proto::tonic::metrics::v1::Sum {
243 aggregation_temporality: sum.temporality() as i32,
244 is_monotonic: sum_is_monotonic,
245 data_points: vec![number_data_point],
246 };
247
248 export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics[0]
250 .data = Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Sum(
251 sum_point_proto,
252 ));
253
254 byte_array.clear(); if self
256 .encode_and_emit_metric(export_metric_service_request_common, byte_array, metric)
257 .is_err()
258 {
259 failed_count += 1;
260 }
261 }
262 failed_count
263 }
264
265 fn process_histogram<T: Numeric>(
266 &self,
267 export_metric_service_request_common: &mut ExportMetricsServiceRequest,
268 byte_array: &mut Vec<u8>,
269 metric: &opentelemetry_sdk::metrics::data::Metric,
270 hist: &opentelemetry_sdk::metrics::data::Histogram<T>,
271 ) -> usize {
272 let hist_start_time = to_nanos(hist.start_time());
274 let hist_time = to_nanos(hist.time());
275 let default_flags =
276 opentelemetry_proto::tonic::metrics::v1::DataPointFlags::default() as u32;
277
278 let metric_proto = opentelemetry_proto::tonic::metrics::v1::Metric {
280 name: metric.name().to_string(),
281 description: metric.description().to_string(),
282 unit: metric.unit().to_string(),
283 metadata: vec![],
284 data: None,
285 };
286
287 export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics =
288 vec![metric_proto];
289
290 let mut failed_count = 0;
291
292 for dp in hist.data_points() {
299 let histogram_data_point =
300 opentelemetry_proto::tonic::metrics::v1::HistogramDataPoint {
301 attributes: dp.attributes().map(Into::into).collect(),
302 start_time_unix_nano: hist_start_time,
303 time_unix_nano: hist_time,
304 count: dp.count(),
305 sum: Some(dp.sum().into_f64()),
306 bucket_counts: dp.bucket_counts().collect(),
307 explicit_bounds: dp.bounds().collect(),
308 exemplars: Vec::new(), flags: default_flags,
310 min: dp.min().map(|v| v.into_f64()),
311 max: dp.max().map(|v| v.into_f64()),
312 };
313
314 let histogram_point_proto = opentelemetry_proto::tonic::metrics::v1::Histogram {
315 aggregation_temporality: hist.temporality() as i32,
316 data_points: vec![histogram_data_point],
317 };
318
319 export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics[0]
321 .data = Some(
322 opentelemetry_proto::tonic::metrics::v1::metric::Data::Histogram(
323 histogram_point_proto,
324 ),
325 );
326
327 byte_array.clear(); if self
329 .encode_and_emit_metric(export_metric_service_request_common, byte_array, metric)
330 .is_err()
331 {
332 failed_count += 1;
333 }
334 }
335 failed_count
336 }
337
338 fn process_exponential_histogram<T: Numeric>(
339 &self,
340 export_metric_service_request_common: &mut ExportMetricsServiceRequest,
341 byte_array: &mut Vec<u8>,
342 metric: &opentelemetry_sdk::metrics::data::Metric,
343 hist: &opentelemetry_sdk::metrics::data::ExponentialHistogram<T>,
344 ) -> usize {
345 let hist_start_time = to_nanos(hist.start_time());
347 let hist_time = to_nanos(hist.time());
348 let default_flags =
349 opentelemetry_proto::tonic::metrics::v1::DataPointFlags::default() as u32;
350
351 let metric_proto = opentelemetry_proto::tonic::metrics::v1::Metric {
353 name: metric.name().to_string(),
354 description: metric.description().to_string(),
355 unit: metric.unit().to_string(),
356 metadata: vec![],
357 data: None,
358 };
359
360 export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics =
361 vec![metric_proto];
362
363 let mut failed_count = 0;
364
365 for dp in hist.data_points() {
372 let histogram_data_point = opentelemetry_proto::tonic::metrics::v1::ExponentialHistogramDataPoint {
373 attributes: dp.attributes().map(Into::into).collect(),
374 start_time_unix_nano: hist_start_time,
375 time_unix_nano: hist_time,
376 count: dp.count() as u64,
377 sum: Some(dp.sum().into_f64()),
378 scale: dp.scale().into(),
379 zero_count: dp.zero_count(),
380 positive: Some(opentelemetry_proto::tonic::metrics::v1::exponential_histogram_data_point::Buckets {
381 offset: dp.positive_bucket().offset(),
382 bucket_counts: dp.positive_bucket().counts().collect(),
383 }),
384 negative: Some(opentelemetry_proto::tonic::metrics::v1::exponential_histogram_data_point::Buckets {
385 offset: dp.negative_bucket().offset(),
386 bucket_counts: dp.negative_bucket().counts().collect(),
387 }),
388 exemplars: Vec::new(), flags: default_flags,
390 min: dp.min().map(|v| v.into_f64()),
391 max: dp.max().map(|v| v.into_f64()),
392 zero_threshold: dp.zero_threshold(),
393 };
394
395 let histogram_point_proto =
396 opentelemetry_proto::tonic::metrics::v1::ExponentialHistogram {
397 aggregation_temporality: hist.temporality() as i32,
398 data_points: vec![histogram_data_point],
399 };
400
401 export_metric_service_request_common.resource_metrics[0].scope_metrics[0].metrics[0]
403 .data = Some(
404 opentelemetry_proto::tonic::metrics::v1::metric::Data::ExponentialHistogram(
405 histogram_point_proto,
406 ),
407 );
408
409 byte_array.clear(); if self
411 .encode_and_emit_metric(export_metric_service_request_common, byte_array, metric)
412 .is_err()
413 {
414 failed_count += 1;
415 }
416 }
417 failed_count
418 }
419
420 fn encode_and_emit_metric(
421 &self,
422 export_metric_service_request_common: &ExportMetricsServiceRequest,
423 byte_array: &mut Vec<u8>,
424 metric: &opentelemetry_sdk::metrics::data::Metric,
425 ) -> Result<(), String> {
426 match export_metric_service_request_common.encode(byte_array) {
427 Ok(_) => {
428 otel_debug!(name: "SerializationSucceeded",
429 metric_name = metric.name(),
430 size = byte_array.len());
431
432 if byte_array.len() > MAX_EVENT_SIZE {
433 let error_msg = format!("Encoded event size exceeds maximum allowed limit of {MAX_EVENT_SIZE} bytes. Event will be dropped.");
434 otel_debug!(
435 name: "EventSizeExceeded",
436 reason = &error_msg,
437 metric_name = metric.name(),
438 size = byte_array.len()
439 );
440 Err(error_msg)
441 } else {
442 let result = tracepoint::write(&self.trace_point, byte_array);
444 if result == 0 {
445 otel_debug!(name: "TracepointWriteSucceeded", message = "Encoded data successfully written to tracepoint", size = byte_array.len(), metric_name = metric.name());
446 Ok(())
447 } else {
448 let error_msg = "Failed to write to tracepoint".to_string();
449 otel_debug!(name: "TracepointWriteFailed", message = &error_msg, metric_name = metric.name(), result = result);
450 Err(error_msg)
451 }
452 }
453 }
454 Err(err) => {
455 let error_msg = format!("Serialization failed: {err}");
456 otel_debug!(name: "SerializationFailed",
457 error = &error_msg,
458 metric_name = metric.name(),
459 size = byte_array.len());
460 Err(error_msg)
461 }
462 }
463 }
464
465 fn export_resource_metrics(&self, resource_metric: &ResourceMetrics) -> OTelSdkResult {
466 let mut byte_array = Vec::new();
505 let mut has_failures = false;
506 let mut export_metric_service_request_common = ExportMetricsServiceRequest {
507 resource_metrics: vec![opentelemetry_proto::tonic::metrics::v1::ResourceMetrics {
508 resource: Some((resource_metric.resource()).into()),
509 scope_metrics: vec![],
510 schema_url: resource_metric
511 .resource()
512 .schema_url()
513 .unwrap_or_default()
514 .to_string(),
515 }],
516 };
517
518 for scope_metric in resource_metric.scope_metrics() {
524 let scope_metric_proto = opentelemetry_proto::tonic::metrics::v1::ScopeMetrics {
526 scope: Some((scope_metric.scope(), None).into()),
527 metrics: vec![],
528 schema_url: scope_metric
529 .scope()
530 .schema_url()
531 .unwrap_or_default()
532 .to_string(),
533 };
534
535 export_metric_service_request_common.resource_metrics[0].scope_metrics =
536 vec![scope_metric_proto];
537
538 for metric in scope_metric.metrics() {
544 let failed_count = match metric.data() {
545 AggregatedMetrics::F64(data) => {
546 self.process_numeric_metrics(
550 &mut export_metric_service_request_common,
551 &mut byte_array,
552 metric,
553 data,
554 )
555 }
556 AggregatedMetrics::U64(data) => self.process_numeric_metrics(
557 &mut export_metric_service_request_common,
558 &mut byte_array,
559 metric,
560 data,
561 ),
562 AggregatedMetrics::I64(data) => self.process_numeric_metrics(
563 &mut export_metric_service_request_common,
564 &mut byte_array,
565 metric,
566 data,
567 ),
568 };
569
570 if failed_count > 0 {
572 has_failures = true;
573 }
574 }
575 }
576
577 if has_failures {
580 Err(OTelSdkError::InternalFailure(
581 "Failed to export some metrics due to serialization or tracepoint write errors"
582 .to_string(),
583 ))
584 } else {
585 Ok(())
586 }
587 }
588}
589
590impl PushMetricExporter for MetricsExporter {
591 async fn export(&self, resource_metrics: &ResourceMetrics) -> OTelSdkResult {
592 otel_debug!(name: "ExportStarted", message = "Starting metrics export");
593 if !self.trace_point.enabled() {
594 otel_info!(name: "TracepointDisabled", message = "Tracepoint is disabled, skipping export");
596 Ok(())
597 } else {
598 self.export_resource_metrics(resource_metrics)
599 }
600 }
601
602 fn temporality(&self) -> Temporality {
603 Temporality::Delta
604 }
605
606 fn force_flush(&self) -> OTelSdkResult {
607 Ok(()) }
609
610 fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
611 Ok(())
614 }
615
616 fn shutdown(&self) -> OTelSdkResult {
617 self.shutdown_with_timeout(Duration::from_secs(5))
618 }
619}