opentelemetry_user_events_metrics/exporter/
mod.rs

1use opentelemetry::{otel_debug, otel_warn};
2use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
3use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
4use opentelemetry_sdk::metrics::data;
5use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
6use opentelemetry_sdk::metrics::{
7    data::{
8        ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, ScopeMetrics,
9    },
10    Temporality,
11};
12
13use crate::tracepoint;
14use eventheader::_internal as ehi;
15use prost::Message;
16use std::fmt::{Debug, Formatter};
17use std::pin::Pin;
18
19const MAX_EVENT_SIZE: usize = 65360;
20
21pub struct MetricsExporter {
22    trace_point: Pin<Box<ehi::TracepointState>>,
23}
24
25impl MetricsExporter {
26    pub fn new() -> MetricsExporter {
27        let trace_point = Box::pin(ehi::TracepointState::new(0));
28        // This is unsafe because if the code is used in a shared object,
29        // the event MUST be unregistered before the shared object unloads.
30        unsafe {
31            let _result = tracepoint::register(trace_point.as_ref());
32        }
33        MetricsExporter { trace_point }
34    }
35}
36
37impl Default for MetricsExporter {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl Debug for MetricsExporter {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        f.write_str("user_events metrics exporter")
46    }
47}
48
49impl MetricsExporter {
50    fn serialize_and_write(
51        &self,
52        resource_metric: &ResourceMetrics,
53        metric_name: &str,
54        metric_type: &str,
55    ) -> OTelSdkResult {
56        // Allocate a local buffer for each write operation
57        // TODO: Investigate if this can be optimized to avoid reallocation or
58        // allocate a fixed buffer size for all writes
59        let mut byte_array = Vec::new();
60
61        // Convert to proto message
62        let proto_message: ExportMetricsServiceRequest = resource_metric.into();
63        otel_debug!(name: "SerializeStart", 
64            metric_name = metric_name,
65            metric_type = metric_type);
66
67        // Encode directly into the buffer
68        match proto_message.encode(&mut byte_array) {
69            Ok(_) => {
70                otel_debug!(name: "SerializeSuccess", 
71                    metric_name = metric_name,
72                    metric_type = metric_type,
73                    size = byte_array.len());
74            }
75            Err(err) => {
76                otel_debug!(name: "SerializeFailed",
77                    error = err.to_string(),
78                    metric_name = metric_name,
79                    metric_type = metric_type,
80                    size = byte_array.len());
81                return Err(OTelSdkError::InternalFailure(err.to_string()));
82            }
83        }
84
85        // Check if the encoded message exceeds the 64 KB limit
86        if byte_array.len() > MAX_EVENT_SIZE {
87            otel_debug!(
88                name: "MaxEventSizeExceeded",
89                reason = format!("Encoded event size exceeds maximum allowed limit of {} bytes. Event will be dropped.", MAX_EVENT_SIZE),
90                metric_name = metric_name,
91                metric_type = metric_type,
92                size = byte_array.len()
93            );
94            return Err(OTelSdkError::InternalFailure(
95                "Event size exceeds maximum allowed limit".into(),
96            ));
97        }
98
99        // Write to the tracepoint
100        let result = tracepoint::write(&self.trace_point, &byte_array);
101        if result > 0 {
102            otel_debug!(name: "TracepointWrite", message = "Encoded data successfully written to tracepoint", size = byte_array.len(), metric_name = metric_name, metric_type = metric_type);
103        }
104
105        Ok(())
106    }
107}
108
109impl PushMetricExporter for MetricsExporter {
110    async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
111        otel_debug!(name: "ExportStart", message = "Starting metrics export");
112        if !self.trace_point.enabled() {
113            // TODO - This can flood the logs if the tracepoint is disabled for long periods of time
114            otel_warn!(name: "TracepointDisabled", message = "Tracepoint is disabled, skipping export");
115            return Ok(());
116        } else {
117            let mut errors = Vec::new();
118
119            for scope_metric in &metrics.scope_metrics {
120                for metric in &scope_metric.metrics {
121                    let data = &metric.data.as_any();
122
123                    if let Some(histogram) = data.downcast_ref::<data::Histogram<u64>>() {
124                        for data_point in &histogram.data_points {
125                            let resource_metric = ResourceMetrics {
126                                resource: metrics.resource.clone(),
127                                scope_metrics: vec![ScopeMetrics {
128                                    scope: scope_metric.scope.clone(),
129                                    metrics: vec![Metric {
130                                        name: metric.name.clone(),
131                                        description: metric.description.clone(),
132                                        unit: metric.unit.clone(),
133                                        data: Box::new(data::Histogram {
134                                            temporality: histogram.temporality,
135                                            start_time: histogram.start_time,
136                                            time: histogram.time,
137                                            data_points: vec![data_point.clone()],
138                                        }),
139                                    }],
140                                }],
141                            };
142                            if let Err(e) = self.serialize_and_write(
143                                &resource_metric,
144                                &metric.name,
145                                "Histogram<u64>",
146                            ) {
147                                errors.push(e.to_string());
148                            }
149                        }
150                    } else if let Some(histogram) = data.downcast_ref::<data::Histogram<f64>>() {
151                        for data_point in &histogram.data_points {
152                            let resource_metric = ResourceMetrics {
153                                resource: metrics.resource.clone(),
154                                scope_metrics: vec![ScopeMetrics {
155                                    scope: scope_metric.scope.clone(),
156                                    metrics: vec![Metric {
157                                        name: metric.name.clone(),
158                                        description: metric.description.clone(),
159                                        unit: metric.unit.clone(),
160                                        data: Box::new(data::Histogram {
161                                            temporality: histogram.temporality,
162                                            start_time: histogram.start_time,
163                                            time: histogram.time,
164                                            data_points: vec![data_point.clone()],
165                                        }),
166                                    }],
167                                }],
168                            };
169                            if let Err(e) = self.serialize_and_write(
170                                &resource_metric,
171                                &metric.name,
172                                "Histogram<f64>",
173                            ) {
174                                errors.push(e.to_string());
175                            }
176                        }
177                    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<u64>>() {
178                        for data_point in &gauge.data_points {
179                            let resource_metric = ResourceMetrics {
180                                resource: metrics.resource.clone(),
181                                scope_metrics: vec![ScopeMetrics {
182                                    scope: scope_metric.scope.clone(),
183                                    metrics: vec![Metric {
184                                        name: metric.name.clone(),
185                                        description: metric.description.clone(),
186                                        unit: metric.unit.clone(),
187                                        data: Box::new(data::Gauge {
188                                            data_points: vec![data_point.clone()],
189                                            start_time: gauge.start_time,
190                                            time: gauge.time,
191                                        }),
192                                    }],
193                                }],
194                            };
195                            if let Err(e) = self.serialize_and_write(
196                                &resource_metric,
197                                &metric.name,
198                                "Gauge<u64>",
199                            ) {
200                                errors.push(e.to_string());
201                            }
202                        }
203                    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<i64>>() {
204                        for data_point in &gauge.data_points {
205                            let resource_metric = ResourceMetrics {
206                                resource: metrics.resource.clone(),
207                                scope_metrics: vec![ScopeMetrics {
208                                    scope: scope_metric.scope.clone(),
209                                    metrics: vec![Metric {
210                                        name: metric.name.clone(),
211                                        description: metric.description.clone(),
212                                        unit: metric.unit.clone(),
213                                        data: Box::new(data::Gauge {
214                                            data_points: vec![data_point.clone()],
215                                            start_time: gauge.start_time,
216                                            time: gauge.time,
217                                        }),
218                                    }],
219                                }],
220                            };
221                            if let Err(e) = self.serialize_and_write(
222                                &resource_metric,
223                                &metric.name,
224                                "Gauge<i64>",
225                            ) {
226                                errors.push(e.to_string());
227                            }
228                        }
229                    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<f64>>() {
230                        for data_point in &gauge.data_points {
231                            let resource_metric = ResourceMetrics {
232                                resource: metrics.resource.clone(),
233                                scope_metrics: vec![ScopeMetrics {
234                                    scope: scope_metric.scope.clone(),
235                                    metrics: vec![Metric {
236                                        name: metric.name.clone(),
237                                        description: metric.description.clone(),
238                                        unit: metric.unit.clone(),
239                                        data: Box::new(data::Gauge {
240                                            data_points: vec![data_point.clone()],
241                                            start_time: gauge.start_time,
242                                            time: gauge.time,
243                                        }),
244                                    }],
245                                }],
246                            };
247                            if let Err(e) = self.serialize_and_write(
248                                &resource_metric,
249                                &metric.name,
250                                "Gauge<f64>",
251                            ) {
252                                errors.push(e.to_string());
253                            }
254                        }
255                    } else if let Some(sum) = data.downcast_ref::<data::Sum<u64>>() {
256                        for data_point in &sum.data_points {
257                            let resource_metric = ResourceMetrics {
258                                resource: metrics.resource.clone(),
259                                scope_metrics: vec![ScopeMetrics {
260                                    scope: scope_metric.scope.clone(),
261                                    metrics: vec![Metric {
262                                        name: metric.name.clone(),
263                                        description: metric.description.clone(),
264                                        unit: metric.unit.clone(),
265                                        data: Box::new(data::Sum {
266                                            temporality: sum.temporality,
267                                            data_points: vec![data_point.clone()],
268                                            is_monotonic: sum.is_monotonic,
269                                            start_time: sum.start_time,
270                                            time: sum.time,
271                                        }),
272                                    }],
273                                }],
274                            };
275                            if let Err(e) =
276                                self.serialize_and_write(&resource_metric, &metric.name, "Sum<u64>")
277                            {
278                                errors.push(e.to_string());
279                            }
280                        }
281                    } else if let Some(sum) = data.downcast_ref::<data::Sum<i64>>() {
282                        for data_point in &sum.data_points {
283                            let resource_metric = ResourceMetrics {
284                                resource: metrics.resource.clone(),
285                                scope_metrics: vec![ScopeMetrics {
286                                    scope: scope_metric.scope.clone(),
287                                    metrics: vec![Metric {
288                                        name: metric.name.clone(),
289                                        description: metric.description.clone(),
290                                        unit: metric.unit.clone(),
291                                        data: Box::new(data::Sum {
292                                            temporality: sum.temporality,
293                                            data_points: vec![data_point.clone()],
294                                            is_monotonic: sum.is_monotonic,
295                                            start_time: sum.start_time,
296                                            time: sum.time,
297                                        }),
298                                    }],
299                                }],
300                            };
301                            if let Err(e) =
302                                self.serialize_and_write(&resource_metric, &metric.name, "Sum<i64>")
303                            {
304                                errors.push(e.to_string());
305                            }
306                        }
307                    } else if let Some(sum) = data.downcast_ref::<data::Sum<f64>>() {
308                        for data_point in &sum.data_points {
309                            let resource_metric = ResourceMetrics {
310                                resource: metrics.resource.clone(),
311                                scope_metrics: vec![ScopeMetrics {
312                                    scope: scope_metric.scope.clone(),
313                                    metrics: vec![Metric {
314                                        name: metric.name.clone(),
315                                        description: metric.description.clone(),
316                                        unit: metric.unit.clone(),
317                                        data: Box::new(data::Sum {
318                                            temporality: sum.temporality,
319                                            data_points: vec![data_point.clone()],
320                                            is_monotonic: sum.is_monotonic,
321                                            start_time: sum.start_time,
322                                            time: sum.time,
323                                        }),
324                                    }],
325                                }],
326                            };
327                            if let Err(e) =
328                                self.serialize_and_write(&resource_metric, &metric.name, "Sum<f64>")
329                            {
330                                errors.push(e.to_string());
331                            }
332                        }
333                    } else if let Some(exp_hist) =
334                        data.downcast_ref::<data::ExponentialHistogram<u64>>()
335                    {
336                        for data_point in &exp_hist.data_points {
337                            let resource_metric = ResourceMetrics {
338                                resource: metrics.resource.clone(),
339                                scope_metrics: vec![ScopeMetrics {
340                                    scope: scope_metric.scope.clone(),
341                                    metrics: vec![Metric {
342                                        name: metric.name.clone(),
343                                        description: metric.description.clone(),
344                                        unit: metric.unit.clone(),
345                                        data: Box::new(data::ExponentialHistogram {
346                                            temporality: exp_hist.temporality,
347                                            start_time: exp_hist.start_time,
348                                            time: exp_hist.time,
349                                            data_points: vec![ExponentialHistogramDataPoint {
350                                                attributes: data_point.attributes.clone(),
351                                                count: data_point.count,
352                                                min: data_point.min,
353                                                max: data_point.max,
354                                                sum: data_point.sum,
355                                                scale: data_point.scale,
356                                                zero_count: data_point.zero_count,
357                                                zero_threshold: data_point.zero_threshold,
358                                                positive_bucket: ExponentialBucket {
359                                                    offset: data_point.positive_bucket.offset,
360                                                    counts: data_point
361                                                        .positive_bucket
362                                                        .counts
363                                                        .clone(),
364                                                },
365                                                negative_bucket: ExponentialBucket {
366                                                    offset: data_point.negative_bucket.offset,
367                                                    counts: data_point
368                                                        .negative_bucket
369                                                        .counts
370                                                        .clone(),
371                                                },
372                                                exemplars: data_point.exemplars.clone(),
373                                            }],
374                                        }),
375                                    }],
376                                }],
377                            };
378                            if let Err(e) = self.serialize_and_write(
379                                &resource_metric,
380                                &metric.name,
381                                "ExponentialHistogram<u64>",
382                            ) {
383                                errors.push(e.to_string());
384                            }
385                        }
386                    } else if let Some(exp_hist) =
387                        data.downcast_ref::<data::ExponentialHistogram<f64>>()
388                    {
389                        for data_point in &exp_hist.data_points {
390                            let resource_metric = ResourceMetrics {
391                                resource: metrics.resource.clone(),
392                                scope_metrics: vec![ScopeMetrics {
393                                    scope: scope_metric.scope.clone(),
394                                    metrics: vec![Metric {
395                                        name: metric.name.clone(),
396                                        description: metric.description.clone(),
397                                        unit: metric.unit.clone(),
398                                        data: Box::new(data::ExponentialHistogram {
399                                            temporality: exp_hist.temporality,
400                                            start_time: exp_hist.start_time,
401                                            time: exp_hist.time,
402                                            data_points: vec![ExponentialHistogramDataPoint {
403                                                attributes: data_point.attributes.clone(),
404                                                count: data_point.count,
405                                                min: data_point.min,
406                                                max: data_point.max,
407                                                sum: data_point.sum,
408                                                scale: data_point.scale,
409                                                zero_count: data_point.zero_count,
410                                                zero_threshold: data_point.zero_threshold,
411                                                positive_bucket: ExponentialBucket {
412                                                    offset: data_point.positive_bucket.offset,
413                                                    counts: data_point
414                                                        .positive_bucket
415                                                        .counts
416                                                        .clone(),
417                                                },
418                                                negative_bucket: ExponentialBucket {
419                                                    offset: data_point.negative_bucket.offset,
420                                                    counts: data_point
421                                                        .negative_bucket
422                                                        .counts
423                                                        .clone(),
424                                                },
425                                                exemplars: data_point.exemplars.clone(),
426                                            }],
427                                        }),
428                                    }],
429                                }],
430                            };
431                            if let Err(e) = self.serialize_and_write(
432                                &resource_metric,
433                                &metric.name,
434                                "ExponentialHistogram<f64>",
435                            ) {
436                                errors.push(e.to_string());
437                            }
438                        }
439                    }
440                }
441            }
442
443            // Return any errors if present
444            if !errors.is_empty() {
445                let error_message = format!(
446                    "Export encountered {} errors: [{}]",
447                    errors.len(),
448                    errors.join("; ")
449                );
450                return Err(OTelSdkError::InternalFailure(error_message));
451            }
452        }
453        Ok(())
454    }
455
456    fn temporality(&self) -> Temporality {
457        Temporality::Delta
458    }
459
460    fn force_flush(&self) -> OTelSdkResult {
461        Ok(()) // In this implementation, flush does nothing
462    }
463
464    fn shutdown(&self) -> OTelSdkResult {
465        // TracepointState automatically deregisters when dropped
466        // https://github.com/microsoft/LinuxTracepoints-Rust/blob/main/eventheader/src/native.rs#L618
467        Ok(())
468    }
469}