opentelemetry_user_events_metrics/exporter/
mod.rs

1use async_trait::async_trait;
2use opentelemetry::{otel_debug, otel_warn};
3use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
4use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
5use opentelemetry_sdk::metrics::data;
6use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
7use opentelemetry_sdk::metrics::{
8    data::{
9        ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, ScopeMetrics,
10    },
11    Temporality,
12};
13
14use crate::tracepoint;
15use eventheader::_internal as ehi;
16use prost::Message;
17use std::fmt::{Debug, Formatter};
18use std::pin::Pin;
19
20const MAX_EVENT_SIZE: usize = 65360;
21
22pub struct MetricsExporter {
23    trace_point: Pin<Box<ehi::TracepointState>>,
24}
25
26impl MetricsExporter {
27    pub fn new() -> MetricsExporter {
28        let trace_point = Box::pin(ehi::TracepointState::new(0));
29        // This is unsafe because if the code is used in a shared object,
30        // the event MUST be unregistered before the shared object unloads.
31        unsafe {
32            let _result = tracepoint::register(trace_point.as_ref());
33        }
34        MetricsExporter { trace_point }
35    }
36}
37
38impl Default for MetricsExporter {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl Debug for MetricsExporter {
45    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46        f.write_str("user_events metrics exporter")
47    }
48}
49
50impl MetricsExporter {
51    fn serialize_and_write(
52        &self,
53        resource_metric: &ResourceMetrics,
54        metric_name: &str,
55        metric_type: &str,
56    ) -> OTelSdkResult {
57        // Allocate a local buffer for each write operation
58        // TODO: Investigate if this can be optimized to avoid reallocation or
59        // allocate a fixed buffer size for all writes
60        let mut byte_array = Vec::new();
61
62        // Convert to proto message
63        let proto_message: ExportMetricsServiceRequest = resource_metric.into();
64        otel_debug!(name: "SerializeStart", 
65            metric_name = metric_name,
66            metric_type = metric_type);
67
68        // Encode directly into the buffer
69        match proto_message.encode(&mut byte_array) {
70            Ok(_) => {
71                otel_debug!(name: "SerializeSuccess", 
72                    metric_name = metric_name,
73                    metric_type = metric_type,
74                    size = byte_array.len());
75            }
76            Err(err) => {
77                otel_debug!(name: "SerializeFailed",
78                    error = err.to_string(),
79                    metric_name = metric_name,
80                    metric_type = metric_type,
81                    size = byte_array.len());
82                return Err(OTelSdkError::InternalFailure(err.to_string()));
83            }
84        }
85
86        // Check if the encoded message exceeds the 64 KB limit
87        if byte_array.len() > MAX_EVENT_SIZE {
88            otel_debug!(
89                name: "MaxEventSizeExceeded",
90                reason = format!("Encoded event size exceeds maximum allowed limit of {} bytes. Event will be dropped.", MAX_EVENT_SIZE),
91                metric_name = metric_name,
92                metric_type = metric_type,
93                size = byte_array.len()
94            );
95            return Err(OTelSdkError::InternalFailure(
96                "Event size exceeds maximum allowed limit".into(),
97            ));
98        }
99
100        // Write to the tracepoint
101        let result = tracepoint::write(&self.trace_point, &byte_array);
102        if result > 0 {
103            otel_debug!(name: "TracepointWrite", message = "Encoded data successfully written to tracepoint", size = byte_array.len(), metric_name = metric_name, metric_type = metric_type);
104        }
105
106        Ok(())
107    }
108}
109
110#[async_trait]
111impl PushMetricExporter for MetricsExporter {
112    async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
113        otel_debug!(name: "ExportStart", message = "Starting metrics export");
114        if !self.trace_point.enabled() {
115            // TODO - This can flood the logs if the tracepoint is disabled for long periods of time
116            otel_warn!(name: "TracepointDisabled", message = "Tracepoint is disabled, skipping export");
117            return Ok(());
118        }
119
120        if self.trace_point.enabled() {
121            let mut errors = Vec::new();
122
123            for scope_metric in &metrics.scope_metrics {
124                for metric in &scope_metric.metrics {
125                    let data = &metric.data.as_any();
126
127                    if let Some(histogram) = data.downcast_ref::<data::Histogram<u64>>() {
128                        for data_point in &histogram.data_points {
129                            let resource_metric = ResourceMetrics {
130                                resource: metrics.resource.clone(),
131                                scope_metrics: vec![ScopeMetrics {
132                                    scope: scope_metric.scope.clone(),
133                                    metrics: vec![Metric {
134                                        name: metric.name.clone(),
135                                        description: metric.description.clone(),
136                                        unit: metric.unit.clone(),
137                                        data: Box::new(data::Histogram {
138                                            temporality: histogram.temporality,
139                                            start_time: histogram.start_time,
140                                            time: histogram.time,
141                                            data_points: vec![data_point.clone()],
142                                        }),
143                                    }],
144                                }],
145                            };
146                            if let Err(e) = self.serialize_and_write(
147                                &resource_metric,
148                                &metric.name,
149                                "Histogram<u64>",
150                            ) {
151                                errors.push(e.to_string());
152                            }
153                        }
154                    } else if let Some(histogram) = data.downcast_ref::<data::Histogram<f64>>() {
155                        for data_point in &histogram.data_points {
156                            let resource_metric = ResourceMetrics {
157                                resource: metrics.resource.clone(),
158                                scope_metrics: vec![ScopeMetrics {
159                                    scope: scope_metric.scope.clone(),
160                                    metrics: vec![Metric {
161                                        name: metric.name.clone(),
162                                        description: metric.description.clone(),
163                                        unit: metric.unit.clone(),
164                                        data: Box::new(data::Histogram {
165                                            temporality: histogram.temporality,
166                                            start_time: histogram.start_time,
167                                            time: histogram.time,
168                                            data_points: vec![data_point.clone()],
169                                        }),
170                                    }],
171                                }],
172                            };
173                            if let Err(e) = self.serialize_and_write(
174                                &resource_metric,
175                                &metric.name,
176                                "Histogram<f64>",
177                            ) {
178                                errors.push(e.to_string());
179                            }
180                        }
181                    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<u64>>() {
182                        for data_point in &gauge.data_points {
183                            let resource_metric = ResourceMetrics {
184                                resource: metrics.resource.clone(),
185                                scope_metrics: vec![ScopeMetrics {
186                                    scope: scope_metric.scope.clone(),
187                                    metrics: vec![Metric {
188                                        name: metric.name.clone(),
189                                        description: metric.description.clone(),
190                                        unit: metric.unit.clone(),
191                                        data: Box::new(data::Gauge {
192                                            data_points: vec![data_point.clone()],
193                                            start_time: gauge.start_time,
194                                            time: gauge.time,
195                                        }),
196                                    }],
197                                }],
198                            };
199                            if let Err(e) = self.serialize_and_write(
200                                &resource_metric,
201                                &metric.name,
202                                "Gauge<u64>",
203                            ) {
204                                errors.push(e.to_string());
205                            }
206                        }
207                    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<i64>>() {
208                        for data_point in &gauge.data_points {
209                            let resource_metric = ResourceMetrics {
210                                resource: metrics.resource.clone(),
211                                scope_metrics: vec![ScopeMetrics {
212                                    scope: scope_metric.scope.clone(),
213                                    metrics: vec![Metric {
214                                        name: metric.name.clone(),
215                                        description: metric.description.clone(),
216                                        unit: metric.unit.clone(),
217                                        data: Box::new(data::Gauge {
218                                            data_points: vec![data_point.clone()],
219                                            start_time: gauge.start_time,
220                                            time: gauge.time,
221                                        }),
222                                    }],
223                                }],
224                            };
225                            if let Err(e) = self.serialize_and_write(
226                                &resource_metric,
227                                &metric.name,
228                                "Gauge<i64>",
229                            ) {
230                                errors.push(e.to_string());
231                            }
232                        }
233                    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<f64>>() {
234                        for data_point in &gauge.data_points {
235                            let resource_metric = ResourceMetrics {
236                                resource: metrics.resource.clone(),
237                                scope_metrics: vec![ScopeMetrics {
238                                    scope: scope_metric.scope.clone(),
239                                    metrics: vec![Metric {
240                                        name: metric.name.clone(),
241                                        description: metric.description.clone(),
242                                        unit: metric.unit.clone(),
243                                        data: Box::new(data::Gauge {
244                                            data_points: vec![data_point.clone()],
245                                            start_time: gauge.start_time,
246                                            time: gauge.time,
247                                        }),
248                                    }],
249                                }],
250                            };
251                            if let Err(e) = self.serialize_and_write(
252                                &resource_metric,
253                                &metric.name,
254                                "Gauge<f64>",
255                            ) {
256                                errors.push(e.to_string());
257                            }
258                        }
259                    } else if let Some(sum) = data.downcast_ref::<data::Sum<u64>>() {
260                        for data_point in &sum.data_points {
261                            let resource_metric = ResourceMetrics {
262                                resource: metrics.resource.clone(),
263                                scope_metrics: vec![ScopeMetrics {
264                                    scope: scope_metric.scope.clone(),
265                                    metrics: vec![Metric {
266                                        name: metric.name.clone(),
267                                        description: metric.description.clone(),
268                                        unit: metric.unit.clone(),
269                                        data: Box::new(data::Sum {
270                                            temporality: sum.temporality,
271                                            data_points: vec![data_point.clone()],
272                                            is_monotonic: sum.is_monotonic,
273                                            start_time: sum.start_time,
274                                            time: sum.time,
275                                        }),
276                                    }],
277                                }],
278                            };
279                            if let Err(e) =
280                                self.serialize_and_write(&resource_metric, &metric.name, "Sum<u64>")
281                            {
282                                errors.push(e.to_string());
283                            }
284                        }
285                    } else if let Some(sum) = data.downcast_ref::<data::Sum<i64>>() {
286                        for data_point in &sum.data_points {
287                            let resource_metric = ResourceMetrics {
288                                resource: metrics.resource.clone(),
289                                scope_metrics: vec![ScopeMetrics {
290                                    scope: scope_metric.scope.clone(),
291                                    metrics: vec![Metric {
292                                        name: metric.name.clone(),
293                                        description: metric.description.clone(),
294                                        unit: metric.unit.clone(),
295                                        data: Box::new(data::Sum {
296                                            temporality: sum.temporality,
297                                            data_points: vec![data_point.clone()],
298                                            is_monotonic: sum.is_monotonic,
299                                            start_time: sum.start_time,
300                                            time: sum.time,
301                                        }),
302                                    }],
303                                }],
304                            };
305                            if let Err(e) =
306                                self.serialize_and_write(&resource_metric, &metric.name, "Sum<i64>")
307                            {
308                                errors.push(e.to_string());
309                            }
310                        }
311                    } else if let Some(sum) = data.downcast_ref::<data::Sum<f64>>() {
312                        for data_point in &sum.data_points {
313                            let resource_metric = ResourceMetrics {
314                                resource: metrics.resource.clone(),
315                                scope_metrics: vec![ScopeMetrics {
316                                    scope: scope_metric.scope.clone(),
317                                    metrics: vec![Metric {
318                                        name: metric.name.clone(),
319                                        description: metric.description.clone(),
320                                        unit: metric.unit.clone(),
321                                        data: Box::new(data::Sum {
322                                            temporality: sum.temporality,
323                                            data_points: vec![data_point.clone()],
324                                            is_monotonic: sum.is_monotonic,
325                                            start_time: sum.start_time,
326                                            time: sum.time,
327                                        }),
328                                    }],
329                                }],
330                            };
331                            if let Err(e) =
332                                self.serialize_and_write(&resource_metric, &metric.name, "Sum<f64>")
333                            {
334                                errors.push(e.to_string());
335                            }
336                        }
337                    } else if let Some(exp_hist) =
338                        data.downcast_ref::<data::ExponentialHistogram<u64>>()
339                    {
340                        for data_point in &exp_hist.data_points {
341                            let resource_metric = ResourceMetrics {
342                                resource: metrics.resource.clone(),
343                                scope_metrics: vec![ScopeMetrics {
344                                    scope: scope_metric.scope.clone(),
345                                    metrics: vec![Metric {
346                                        name: metric.name.clone(),
347                                        description: metric.description.clone(),
348                                        unit: metric.unit.clone(),
349                                        data: Box::new(data::ExponentialHistogram {
350                                            temporality: exp_hist.temporality,
351                                            start_time: exp_hist.start_time,
352                                            time: exp_hist.time,
353                                            data_points: vec![ExponentialHistogramDataPoint {
354                                                attributes: data_point.attributes.clone(),
355                                                count: data_point.count,
356                                                min: data_point.min,
357                                                max: data_point.max,
358                                                sum: data_point.sum,
359                                                scale: data_point.scale,
360                                                zero_count: data_point.zero_count,
361                                                zero_threshold: data_point.zero_threshold,
362                                                positive_bucket: ExponentialBucket {
363                                                    offset: data_point.positive_bucket.offset,
364                                                    counts: data_point
365                                                        .positive_bucket
366                                                        .counts
367                                                        .clone(),
368                                                },
369                                                negative_bucket: ExponentialBucket {
370                                                    offset: data_point.negative_bucket.offset,
371                                                    counts: data_point
372                                                        .negative_bucket
373                                                        .counts
374                                                        .clone(),
375                                                },
376                                                exemplars: data_point.exemplars.clone(),
377                                            }],
378                                        }),
379                                    }],
380                                }],
381                            };
382                            if let Err(e) = self.serialize_and_write(
383                                &resource_metric,
384                                &metric.name,
385                                "ExponentialHistogram<u64>",
386                            ) {
387                                errors.push(e.to_string());
388                            }
389                        }
390                    } else if let Some(exp_hist) =
391                        data.downcast_ref::<data::ExponentialHistogram<f64>>()
392                    {
393                        for data_point in &exp_hist.data_points {
394                            let resource_metric = ResourceMetrics {
395                                resource: metrics.resource.clone(),
396                                scope_metrics: vec![ScopeMetrics {
397                                    scope: scope_metric.scope.clone(),
398                                    metrics: vec![Metric {
399                                        name: metric.name.clone(),
400                                        description: metric.description.clone(),
401                                        unit: metric.unit.clone(),
402                                        data: Box::new(data::ExponentialHistogram {
403                                            temporality: exp_hist.temporality,
404                                            start_time: exp_hist.start_time,
405                                            time: exp_hist.time,
406                                            data_points: vec![ExponentialHistogramDataPoint {
407                                                attributes: data_point.attributes.clone(),
408                                                count: data_point.count,
409                                                min: data_point.min,
410                                                max: data_point.max,
411                                                sum: data_point.sum,
412                                                scale: data_point.scale,
413                                                zero_count: data_point.zero_count,
414                                                zero_threshold: data_point.zero_threshold,
415                                                positive_bucket: ExponentialBucket {
416                                                    offset: data_point.positive_bucket.offset,
417                                                    counts: data_point
418                                                        .positive_bucket
419                                                        .counts
420                                                        .clone(),
421                                                },
422                                                negative_bucket: ExponentialBucket {
423                                                    offset: data_point.negative_bucket.offset,
424                                                    counts: data_point
425                                                        .negative_bucket
426                                                        .counts
427                                                        .clone(),
428                                                },
429                                                exemplars: data_point.exemplars.clone(),
430                                            }],
431                                        }),
432                                    }],
433                                }],
434                            };
435                            if let Err(e) = self.serialize_and_write(
436                                &resource_metric,
437                                &metric.name,
438                                "ExponentialHistogram<f64>",
439                            ) {
440                                errors.push(e.to_string());
441                            }
442                        }
443                    }
444                }
445            }
446
447            // Return any errors if present
448            if !errors.is_empty() {
449                let error_message = format!(
450                    "Export encountered {} errors: [{}]",
451                    errors.len(),
452                    errors.join("; ")
453                );
454                return Err(OTelSdkError::InternalFailure(error_message));
455            }
456        }
457        Ok(())
458    }
459
460    fn temporality(&self) -> Temporality {
461        Temporality::Delta
462    }
463
464    async fn force_flush(&self) -> OTelSdkResult {
465        Ok(()) // In this implementation, flush does nothing
466    }
467
468    fn shutdown(&self) -> OTelSdkResult {
469        // TracepointState automatically unregisters when dropped
470        // https://github.com/microsoft/LinuxTracepoints-Rust/blob/main/eventheader/src/native.rs#L618
471        Ok(())
472    }
473}