opentelemetry_user_events_metrics/
lib.rs

1mod exporter;
2mod tracepoint;
3
4pub use exporter::MetricsExporter;
5
6#[cfg(test)]
7mod tests {
8    use crate::MetricsExporter;
9    use opentelemetry::metrics::MeterProvider;
10    use opentelemetry::KeyValue;
11    use opentelemetry_sdk::metrics::SdkMeterProvider;
12    use opentelemetry_sdk::Resource;
13
14    mod test_utils {
15        use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
16        use prost::Message;
17        use serde_json::{self, Value};
18        use std::process::Command;
19
20        /// Represents a user event record from perf data
21        #[derive(Debug, Clone)]
22        #[allow(dead_code)]
23        pub struct UserEventRecord {
24            pub name: String,
25            pub protocol: u32,
26            pub version: String,
27            pub buffer: Vec<u8>,
28        }
29
30        /// Extract user event records from JSON content
31        pub fn extract_user_events(
32            json_content: &str,
33        ) -> Result<Vec<UserEventRecord>, Box<dyn std::error::Error>> {
34            let parsed: Value = serde_json::from_str(json_content)?;
35            let mut records = Vec::new();
36
37            // The JSON structure is { "./perf.data": [array of events] }
38            if let Some(events_map) = parsed.as_object() {
39                for (_, events_value) in events_map {
40                    if let Some(events_array) = events_value.as_array() {
41                        for event in events_array {
42                            if let Some(record) = parse_user_event_record(event)? {
43                                records.push(record);
44                            }
45                        }
46                    }
47                }
48            }
49
50            Ok(records)
51        }
52
53        /// Parse a single user event record from JSON (test-only)
54        fn parse_user_event_record(
55            event: &Value,
56        ) -> Result<Option<UserEventRecord>, Box<dyn std::error::Error>> {
57            let name = event["n"].as_str().unwrap_or("").to_string();
58            let protocol = event["protocol"].as_u64().unwrap_or(0) as u32;
59            let version = event["version"].as_str().unwrap_or("").to_string();
60
61            // Extract buffer as Vec<u8>
62            let buffer = if let Some(buffer_array) = event["buffer"].as_array() {
63                buffer_array
64                    .iter()
65                    .filter_map(|v| v.as_u64().map(|n| n as u8))
66                    .collect()
67            } else {
68                Vec::new()
69            };
70
71            Ok(Some(UserEventRecord {
72                name,
73                protocol,
74                version,
75                buffer,
76            }))
77        }
78
79        /// Decode OTLP protobuf buffer to ExportMetricsServiceRequest (test-only)
80        fn decode_otlp_metrics(
81            buffer: &[u8],
82        ) -> Result<ExportMetricsServiceRequest, Box<dyn std::error::Error>> {
83            let request = ExportMetricsServiceRequest::decode(buffer)?;
84            Ok(request)
85        }
86
87        /// Helper function to process all OTLP metrics from JSON content (test-only)
88        pub fn extract_and_decode_otlp_metrics(
89            json_content: &str,
90        ) -> Result<Vec<ExportMetricsServiceRequest>, Box<dyn std::error::Error>> {
91            let user_events = extract_user_events(json_content)?;
92            let mut decoded_metrics = Vec::new();
93
94            for event in user_events {
95                // Filter for OTLP metrics events
96                if event.name.contains("otlp_metrics") {
97                    match decode_otlp_metrics(&event.buffer) {
98                        Ok(metrics_request) => {
99                            decoded_metrics.push(metrics_request);
100                        }
101                        Err(e) => {
102                            eprintln!("Failed to decode OTLP metrics from buffer: {e}");
103                            // Continue processing other events instead of failing completely
104                        }
105                    }
106                }
107            }
108
109            Ok(decoded_metrics)
110        }
111
112        pub fn check_user_events_available() -> Result<String, String> {
113            let output = Command::new("sudo")
114                .arg("cat")
115                .arg("/sys/kernel/tracing/user_events_status")
116                .output()
117                .map_err(|e| format!("Failed to execute command: {e}"))?;
118
119            if output.status.success() {
120                let status = String::from_utf8_lossy(&output.stdout);
121                Ok(status.to_string())
122            } else {
123                Err(format!(
124                    "Command executed with failing error code: {}",
125                    String::from_utf8_lossy(&output.stderr)
126                ))
127            }
128        }
129
130        pub fn run_perf_and_decode(duration_secs: u64, event: &str) -> std::io::Result<String> {
131            // Run perf record for duration_secs seconds
132            let perf_status = Command::new("sudo")
133                .args([
134                    "timeout",
135                    "-s",
136                    "SIGINT",
137                    &duration_secs.to_string(),
138                    "perf",
139                    "record",
140                    "-e",
141                    event,
142                ])
143                .status()?;
144
145            if !perf_status.success() {
146                // Check if it's the expected signal termination (SIGINT from timeout)
147                // timeout sends SIGINT, which will cause a non-zero exit code (130 typically)
148                if !matches!(perf_status.code(), Some(124) | Some(130) | Some(143)) {
149                    panic!(
150                        "perf record failed with exit code: {:?}",
151                        perf_status.code()
152                    );
153                }
154            }
155
156            // Change permissions on perf.data (which is the default file perf records to) to allow reading
157            let chmod_status = Command::new("sudo")
158                .args(["chmod", "uog+r", "./perf.data"])
159                .status()?;
160
161            if !chmod_status.success() {
162                panic!("chmod failed with exit code: {:?}", chmod_status.code());
163            }
164
165            // Decode the performance data and return it directly
166            // Note: This tool must be installed on the machine
167            // git clone https://github.com/microsoft/LinuxTracepoints &&
168            // cd LinuxTracepoints && mkdir build && cd build && cmake .. && make &&
169            // sudo cp bin/perf-decode /usr/local/bin &&
170            let decode_output = Command::new("perf-decode").args(["./perf.data"]).output()?;
171
172            if !decode_output.status.success() {
173                panic!(
174                    "perf-decode failed with exit code: {:?}",
175                    decode_output.status.code()
176                );
177            }
178
179            // Convert the output to a String
180            let raw_output = String::from_utf8_lossy(&decode_output.stdout).to_string();
181
182            // Remove any Byte Order Mark (BOM) characters
183            // UTF-8 BOM is EF BB BF (in hex)
184            let cleaned_output = if let Some(stripped) = raw_output.strip_prefix('\u{FEFF}') {
185                // Skip the BOM character
186                stripped.to_string()
187            } else {
188                raw_output
189            };
190
191            // Trim the output to remove any leading/trailing whitespace
192            let trimmed_output = cleaned_output.trim().to_string();
193
194            Ok(trimmed_output)
195        }
196
197        /// Extract metric data from different metric types
198        /// Returns a reference to the data points vector for the given metric type
199        /// TODO: Add support for more metric types like Histogram and ExponentialHistogram
200        /// This function assumes that the metric data is either Sum or Gauge type
201        pub fn extract_metric_data(
202            metric_data: &opentelemetry_proto::tonic::metrics::v1::metric::Data,
203            request_index: usize,
204        ) -> &Vec<opentelemetry_proto::tonic::metrics::v1::NumberDataPoint> {
205            match metric_data {
206                opentelemetry_proto::tonic::metrics::v1::metric::Data::Sum(sum) => &sum.data_points,
207                opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge) => {
208                    &gauge.data_points
209                }
210                // TODO: Add support for Histogram and ExponentialHistogram
211                // These will need special handling as they don't use NumberDataPoint:
212                // opentelemetry_proto::tonic::metrics::v1::metric::Data::Histogram(hist) => {
213                //     // Histogram uses HistogramDataPoint instead of NumberDataPoint
214                //     // Will need separate handling or abstraction
215                // }
216                // opentelemetry_proto::tonic::metrics::v1::metric::Data::ExponentialHistogram(exp_hist) => {
217                //     // ExponentialHistogram uses ExponentialHistogramDataPoint
218                //     // Will need separate handling or abstraction
219                // }
220                _ => panic!(
221                    "Unsupported metric data type in request {}",
222                    request_index + 1
223                ),
224            }
225        }
226
227        /// Extract and validate data point from metric data (supports Sum and Gauge types)
228        /// Returns the attributes from the single data point after validation
229        pub fn extract_and_validate_metric_data(
230            metric: &opentelemetry_proto::tonic::metrics::v1::Metric,
231            expected_value: u64,
232            request_index: usize,
233        ) -> Vec<opentelemetry::KeyValue> {
234            if let Some(data) = &metric.data {
235                // Use helper method to extract data points based on metric type
236                let data_points = extract_metric_data(data, request_index);
237
238                // Validate exactly one data point
239                assert_eq!(
240                    data_points.len(),
241                    1,
242                    "Request {} should have exactly one data point",
243                    request_index + 1
244                );
245
246                let data_point = &data_points[0];
247
248                // Validate counter value
249                if let Some(value) = &data_point.value {
250                    match value {
251                        opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(int_val) => {
252                            assert_eq!(*int_val as u64, expected_value,
253                                "Counter value should match expected value in request {}", request_index + 1);
254                        }
255                        _ => panic!("Expected integer value for u64 counter in request {}", request_index + 1),
256                    }
257                }
258
259                // Extract attributes from data point
260                let mut actual_attributes: Vec<opentelemetry::KeyValue> = Vec::new();
261                for attr in &data_point.attributes {
262                    if let Some(value) = &attr.value {
263                        if let Some(string_value) = &value.value {
264                            match string_value {
265                                opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s) => {
266                                    actual_attributes.push(opentelemetry::KeyValue::new(attr.key.clone(), s.clone()));
267                                }
268                                _ => {
269                                    panic!("Unsupported attribute value type for key: {} in request {}", attr.key, request_index + 1);
270                                }
271                            }
272                        }
273                    }
274                }
275
276                // Sort attributes for consistent comparison
277                actual_attributes.sort_by(|a, b| a.key.as_str().cmp(b.key.as_str()));
278                actual_attributes
279            } else {
280                panic!("Metric data is missing in request {}", request_index + 1);
281            }
282        }
283    }
284
285    #[ignore]
286    #[test]
287    fn integration_test_basic() {
288        // Run using the below command
289        // sudo -E ~/.cargo/bin/cargo test integration_test_basic -- --nocapture --ignored
290
291        test_utils::check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
292
293        let exporter = MetricsExporter::new();
294        let provider = SdkMeterProvider::builder()
295            .with_resource(
296                Resource::builder_empty()
297                    .with_attributes(vec![KeyValue::new("service.name", "metric-demo")])
298                    .build(),
299            )
300            .with_periodic_exporter(exporter)
301            .build();
302
303        let meter = provider.meter("user-event-test");
304
305        // Create a Counter Instrument.
306        let counter = meter
307            .u64_counter("counter_u64_test")
308            .with_description("test_decription")
309            .with_unit("test_unit")
310            .build();
311
312        counter.add(
313            1,
314            &[
315                KeyValue::new("mykey1", "myvalue1"),
316                KeyValue::new("mykey2", "myvalue2"),
317            ],
318        );
319
320        counter.add(
321            1,
322            &[
323                KeyValue::new("mykey1", "myvalueA"),
324                KeyValue::new("mykey2", "myvalueB"),
325            ],
326        );
327
328        let perf_thread = std::thread::spawn(move || {
329            test_utils::run_perf_and_decode(5, "user_events:otlp_metrics")
330        });
331
332        // Give a little time for perf to start recording
333        std::thread::sleep(std::time::Duration::from_millis(1000));
334
335        provider
336            .shutdown()
337            .expect("Failed to shutdown meter provider");
338        let result = perf_thread.join().expect("Perf thread panicked");
339
340        assert!(result.is_ok());
341        let json_content = result.unwrap();
342        assert!(!json_content.is_empty());
343
344        let formatted_output = json_content.trim().to_string();
345        println!("Formatted Output: {formatted_output}");
346
347        // Extract and decode OTLP metrics from the JSON content
348        let decoded_metrics = test_utils::extract_and_decode_otlp_metrics(&formatted_output)
349            .expect("Failed to extract and decode OTLP metrics");
350
351        // Expected values from the test setup
352        let expected_counter_name = "counter_u64_test";
353        let expected_description = "test_decription";
354        let expected_unit = "test_unit";
355        let expected_value = 1u64;
356        // Create expected attributes in sorted order (by key)
357        let expected_attributes_1 = vec![
358            KeyValue::new("mykey1", "myvalue1"),
359            KeyValue::new("mykey2", "myvalue2"),
360        ];
361        let expected_attributes_2 = vec![
362            KeyValue::new("mykey1", "myvalueA"),
363            KeyValue::new("mykey2", "myvalueB"),
364        ];
365        let expected_service_name = "metric-demo";
366        let expected_meter_name = "user-event-test";
367
368        // STEP 1: Validate upfront that we have exactly 2 entries
369        assert_eq!(
370            decoded_metrics.len(),
371            2,
372            "Expected exactly 2 metrics payloads (one per data point)"
373        );
374
375        // STEP 2: Do common validation on both entries (resource, scope, metric metadata)
376        for (index, metrics_request) in decoded_metrics.iter().enumerate() {
377            println!(
378                "Validating common elements for Metrics Request {}",
379                index + 1
380            );
381
382            // Validate resource metrics structure
383            assert!(
384                !metrics_request.resource_metrics.is_empty(),
385                "Metrics request {} should have resource metrics",
386                index + 1
387            );
388
389            for resource_metric in &metrics_request.resource_metrics {
390                // Validate resource attributes (service.name)
391                if let Some(resource) = &resource_metric.resource {
392                    let service_name_attr = resource
393                        .attributes
394                        .iter()
395                        .find(|attr| attr.key == "service.name");
396                    if let Some(attr) = service_name_attr {
397                        if let Some(value) = &attr.value {
398                            if let Some(string_value) = &value.value {
399                                match string_value {
400                                    opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s) => {
401                                        assert_eq!(s, expected_service_name,
402                                            "Service name should match expected value in request {}", index + 1);
403                                    }
404                                    _ => panic!("Service name attribute should be a string value in request {}", index + 1),
405                                }
406                            }
407                        }
408                    }
409                }
410
411                for scope_metric in &resource_metric.scope_metrics {
412                    // Validate scope/meter name
413                    if let Some(scope) = &scope_metric.scope {
414                        assert_eq!(
415                            scope.name,
416                            expected_meter_name,
417                            "Meter name should match expected value in request {}",
418                            index + 1
419                        );
420                    }
421
422                    // Validate metrics metadata (should be consistent across both requests)
423                    for metric in &scope_metric.metrics {
424                        if metric.name == expected_counter_name {
425                            assert_eq!(
426                                metric.name,
427                                expected_counter_name,
428                                "Metric name should match expected value in request {}",
429                                index + 1
430                            );
431                            assert_eq!(
432                                metric.description,
433                                expected_description,
434                                "Metric description should match expected value in request {}",
435                                index + 1
436                            );
437                            assert_eq!(
438                                metric.unit,
439                                expected_unit,
440                                "Metric unit should match expected value in request {}",
441                                index + 1
442                            );
443                        }
444                    }
445                }
446            }
447        }
448
449        // STEP 3: Validate that each entry has exactly one data point and collect attributes
450        let mut actual_attribute_sets = Vec::new();
451
452        for (index, metrics_request) in decoded_metrics.iter().enumerate() {
453            println!("Validating data points for Metrics Request {}", index + 1);
454
455            for resource_metric in &metrics_request.resource_metrics {
456                for scope_metric in &resource_metric.scope_metrics {
457                    for metric in &scope_metric.metrics {
458                        if metric.name == expected_counter_name {
459                            // Use helper method to extract and validate metric data
460                            let actual_attributes = test_utils::extract_and_validate_metric_data(
461                                metric,
462                                expected_value,
463                                index,
464                            );
465                            actual_attribute_sets.push(actual_attributes);
466                        }
467                    }
468                }
469            }
470        }
471
472        // STEP 4: Validate that both expected attribute sets are present (order independent)
473        assert_eq!(
474            actual_attribute_sets.len(),
475            2,
476            "Should have collected exactly 2 data points"
477        );
478
479        // Check that both expected attribute sets are present (order independent)
480        // Note: expected_attributes are already in sorted order by key
481        let mut found_attributes_1 = false;
482        let mut found_attributes_2 = false;
483
484        for actual_attributes in &actual_attribute_sets {
485            if actual_attributes == &expected_attributes_1 {
486                found_attributes_1 = true;
487            } else if actual_attributes == &expected_attributes_2 {
488                found_attributes_2 = true;
489            }
490        }
491
492        assert!(
493            found_attributes_1,
494            "Should find data point with attributes: {expected_attributes_1:?}"
495        );
496        assert!(
497            found_attributes_2,
498            "Should find data point with attributes: {expected_attributes_2:?}"
499        );
500
501        println!("Success!");
502    }
503}