opentelemetry_user_events_metrics/
lib.rs1mod exporter;
2mod tracepoint;
3
4pub use exporter::MetricsExporter;
5
6#[cfg(test)]
7mod tests {
8 use crate::MetricsExporter;
9 use opentelemetry::metrics::MeterProvider;
10 use opentelemetry::KeyValue;
11 use opentelemetry_sdk::metrics::SdkMeterProvider;
12 use opentelemetry_sdk::Resource;
13
14 mod test_utils {
15 use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
16 use prost::Message;
17 use serde_json::{self, Value};
18 use std::process::Command;
19
20 #[derive(Debug, Clone)]
22 #[allow(dead_code)]
23 pub struct UserEventRecord {
24 pub name: String,
25 pub protocol: u32,
26 pub version: String,
27 pub buffer: Vec<u8>,
28 }
29
30 pub fn extract_user_events(
32 json_content: &str,
33 ) -> Result<Vec<UserEventRecord>, Box<dyn std::error::Error>> {
34 let parsed: Value = serde_json::from_str(json_content)?;
35 let mut records = Vec::new();
36
37 if let Some(events_map) = parsed.as_object() {
39 for (_, events_value) in events_map {
40 if let Some(events_array) = events_value.as_array() {
41 for event in events_array {
42 if let Some(record) = parse_user_event_record(event)? {
43 records.push(record);
44 }
45 }
46 }
47 }
48 }
49
50 Ok(records)
51 }
52
53 fn parse_user_event_record(
55 event: &Value,
56 ) -> Result<Option<UserEventRecord>, Box<dyn std::error::Error>> {
57 let name = event["n"].as_str().unwrap_or("").to_string();
58 let protocol = event["protocol"].as_u64().unwrap_or(0) as u32;
59 let version = event["version"].as_str().unwrap_or("").to_string();
60
61 let buffer = if let Some(buffer_array) = event["buffer"].as_array() {
63 buffer_array
64 .iter()
65 .filter_map(|v| v.as_u64().map(|n| n as u8))
66 .collect()
67 } else {
68 Vec::new()
69 };
70
71 Ok(Some(UserEventRecord {
72 name,
73 protocol,
74 version,
75 buffer,
76 }))
77 }
78
79 fn decode_otlp_metrics(
81 buffer: &[u8],
82 ) -> Result<ExportMetricsServiceRequest, Box<dyn std::error::Error>> {
83 let request = ExportMetricsServiceRequest::decode(buffer)?;
84 Ok(request)
85 }
86
87 pub fn extract_and_decode_otlp_metrics(
89 json_content: &str,
90 ) -> Result<Vec<ExportMetricsServiceRequest>, Box<dyn std::error::Error>> {
91 let user_events = extract_user_events(json_content)?;
92 let mut decoded_metrics = Vec::new();
93
94 for event in user_events {
95 if event.name.contains("otlp_metrics") {
97 match decode_otlp_metrics(&event.buffer) {
98 Ok(metrics_request) => {
99 decoded_metrics.push(metrics_request);
100 }
101 Err(e) => {
102 eprintln!("Failed to decode OTLP metrics from buffer: {e}");
103 }
105 }
106 }
107 }
108
109 Ok(decoded_metrics)
110 }
111
112 pub fn check_user_events_available() -> Result<String, String> {
113 let output = Command::new("sudo")
114 .arg("cat")
115 .arg("/sys/kernel/tracing/user_events_status")
116 .output()
117 .map_err(|e| format!("Failed to execute command: {e}"))?;
118
119 if output.status.success() {
120 let status = String::from_utf8_lossy(&output.stdout);
121 Ok(status.to_string())
122 } else {
123 Err(format!(
124 "Command executed with failing error code: {}",
125 String::from_utf8_lossy(&output.stderr)
126 ))
127 }
128 }
129
130 pub fn run_perf_and_decode(duration_secs: u64, event: &str) -> std::io::Result<String> {
131 let perf_status = Command::new("sudo")
133 .args([
134 "timeout",
135 "-s",
136 "SIGINT",
137 &duration_secs.to_string(),
138 "perf",
139 "record",
140 "-e",
141 event,
142 ])
143 .status()?;
144
145 if !perf_status.success() {
146 if !matches!(perf_status.code(), Some(124) | Some(130) | Some(143)) {
149 panic!(
150 "perf record failed with exit code: {:?}",
151 perf_status.code()
152 );
153 }
154 }
155
156 let chmod_status = Command::new("sudo")
158 .args(["chmod", "uog+r", "./perf.data"])
159 .status()?;
160
161 if !chmod_status.success() {
162 panic!("chmod failed with exit code: {:?}", chmod_status.code());
163 }
164
165 let decode_output = Command::new("perf-decode").args(["./perf.data"]).output()?;
171
172 if !decode_output.status.success() {
173 panic!(
174 "perf-decode failed with exit code: {:?}",
175 decode_output.status.code()
176 );
177 }
178
179 let raw_output = String::from_utf8_lossy(&decode_output.stdout).to_string();
181
182 let cleaned_output = if let Some(stripped) = raw_output.strip_prefix('\u{FEFF}') {
185 stripped.to_string()
187 } else {
188 raw_output
189 };
190
191 let trimmed_output = cleaned_output.trim().to_string();
193
194 Ok(trimmed_output)
195 }
196
197 pub fn extract_metric_data(
202 metric_data: &opentelemetry_proto::tonic::metrics::v1::metric::Data,
203 request_index: usize,
204 ) -> &Vec<opentelemetry_proto::tonic::metrics::v1::NumberDataPoint> {
205 match metric_data {
206 opentelemetry_proto::tonic::metrics::v1::metric::Data::Sum(sum) => &sum.data_points,
207 opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge) => {
208 &gauge.data_points
209 }
210 _ => panic!(
221 "Unsupported metric data type in request {}",
222 request_index + 1
223 ),
224 }
225 }
226
227 pub fn extract_and_validate_metric_data(
230 metric: &opentelemetry_proto::tonic::metrics::v1::Metric,
231 expected_value: u64,
232 request_index: usize,
233 ) -> Vec<opentelemetry::KeyValue> {
234 if let Some(data) = &metric.data {
235 let data_points = extract_metric_data(data, request_index);
237
238 assert_eq!(
240 data_points.len(),
241 1,
242 "Request {} should have exactly one data point",
243 request_index + 1
244 );
245
246 let data_point = &data_points[0];
247
248 if let Some(value) = &data_point.value {
250 match value {
251 opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(int_val) => {
252 assert_eq!(*int_val as u64, expected_value,
253 "Counter value should match expected value in request {}", request_index + 1);
254 }
255 _ => panic!("Expected integer value for u64 counter in request {}", request_index + 1),
256 }
257 }
258
259 let mut actual_attributes: Vec<opentelemetry::KeyValue> = Vec::new();
261 for attr in &data_point.attributes {
262 if let Some(value) = &attr.value {
263 if let Some(string_value) = &value.value {
264 match string_value {
265 opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s) => {
266 actual_attributes.push(opentelemetry::KeyValue::new(attr.key.clone(), s.clone()));
267 }
268 _ => {
269 panic!("Unsupported attribute value type for key: {} in request {}", attr.key, request_index + 1);
270 }
271 }
272 }
273 }
274 }
275
276 actual_attributes.sort_by(|a, b| a.key.as_str().cmp(b.key.as_str()));
278 actual_attributes
279 } else {
280 panic!("Metric data is missing in request {}", request_index + 1);
281 }
282 }
283 }
284
285 #[ignore]
286 #[test]
287 fn integration_test_basic() {
288 test_utils::check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
292
293 let exporter = MetricsExporter::new();
294 let provider = SdkMeterProvider::builder()
295 .with_resource(
296 Resource::builder_empty()
297 .with_attributes(vec![KeyValue::new("service.name", "metric-demo")])
298 .build(),
299 )
300 .with_periodic_exporter(exporter)
301 .build();
302
303 let meter = provider.meter("user-event-test");
304
305 let counter = meter
307 .u64_counter("counter_u64_test")
308 .with_description("test_decription")
309 .with_unit("test_unit")
310 .build();
311
312 counter.add(
313 1,
314 &[
315 KeyValue::new("mykey1", "myvalue1"),
316 KeyValue::new("mykey2", "myvalue2"),
317 ],
318 );
319
320 counter.add(
321 1,
322 &[
323 KeyValue::new("mykey1", "myvalueA"),
324 KeyValue::new("mykey2", "myvalueB"),
325 ],
326 );
327
328 let perf_thread = std::thread::spawn(move || {
329 test_utils::run_perf_and_decode(5, "user_events:otlp_metrics")
330 });
331
332 std::thread::sleep(std::time::Duration::from_millis(1000));
334
335 provider
336 .shutdown()
337 .expect("Failed to shutdown meter provider");
338 let result = perf_thread.join().expect("Perf thread panicked");
339
340 assert!(result.is_ok());
341 let json_content = result.unwrap();
342 assert!(!json_content.is_empty());
343
344 let formatted_output = json_content.trim().to_string();
345 println!("Formatted Output: {formatted_output}");
346
347 let decoded_metrics = test_utils::extract_and_decode_otlp_metrics(&formatted_output)
349 .expect("Failed to extract and decode OTLP metrics");
350
351 let expected_counter_name = "counter_u64_test";
353 let expected_description = "test_decription";
354 let expected_unit = "test_unit";
355 let expected_value = 1u64;
356 let expected_attributes_1 = vec![
358 KeyValue::new("mykey1", "myvalue1"),
359 KeyValue::new("mykey2", "myvalue2"),
360 ];
361 let expected_attributes_2 = vec![
362 KeyValue::new("mykey1", "myvalueA"),
363 KeyValue::new("mykey2", "myvalueB"),
364 ];
365 let expected_service_name = "metric-demo";
366 let expected_meter_name = "user-event-test";
367
368 assert_eq!(
370 decoded_metrics.len(),
371 2,
372 "Expected exactly 2 metrics payloads (one per data point)"
373 );
374
375 for (index, metrics_request) in decoded_metrics.iter().enumerate() {
377 println!(
378 "Validating common elements for Metrics Request {}",
379 index + 1
380 );
381
382 assert!(
384 !metrics_request.resource_metrics.is_empty(),
385 "Metrics request {} should have resource metrics",
386 index + 1
387 );
388
389 for resource_metric in &metrics_request.resource_metrics {
390 if let Some(resource) = &resource_metric.resource {
392 let service_name_attr = resource
393 .attributes
394 .iter()
395 .find(|attr| attr.key == "service.name");
396 if let Some(attr) = service_name_attr {
397 if let Some(value) = &attr.value {
398 if let Some(string_value) = &value.value {
399 match string_value {
400 opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s) => {
401 assert_eq!(s, expected_service_name,
402 "Service name should match expected value in request {}", index + 1);
403 }
404 _ => panic!("Service name attribute should be a string value in request {}", index + 1),
405 }
406 }
407 }
408 }
409 }
410
411 for scope_metric in &resource_metric.scope_metrics {
412 if let Some(scope) = &scope_metric.scope {
414 assert_eq!(
415 scope.name,
416 expected_meter_name,
417 "Meter name should match expected value in request {}",
418 index + 1
419 );
420 }
421
422 for metric in &scope_metric.metrics {
424 if metric.name == expected_counter_name {
425 assert_eq!(
426 metric.name,
427 expected_counter_name,
428 "Metric name should match expected value in request {}",
429 index + 1
430 );
431 assert_eq!(
432 metric.description,
433 expected_description,
434 "Metric description should match expected value in request {}",
435 index + 1
436 );
437 assert_eq!(
438 metric.unit,
439 expected_unit,
440 "Metric unit should match expected value in request {}",
441 index + 1
442 );
443 }
444 }
445 }
446 }
447 }
448
449 let mut actual_attribute_sets = Vec::new();
451
452 for (index, metrics_request) in decoded_metrics.iter().enumerate() {
453 println!("Validating data points for Metrics Request {}", index + 1);
454
455 for resource_metric in &metrics_request.resource_metrics {
456 for scope_metric in &resource_metric.scope_metrics {
457 for metric in &scope_metric.metrics {
458 if metric.name == expected_counter_name {
459 let actual_attributes = test_utils::extract_and_validate_metric_data(
461 metric,
462 expected_value,
463 index,
464 );
465 actual_attribute_sets.push(actual_attributes);
466 }
467 }
468 }
469 }
470 }
471
472 assert_eq!(
474 actual_attribute_sets.len(),
475 2,
476 "Should have collected exactly 2 data points"
477 );
478
479 let mut found_attributes_1 = false;
482 let mut found_attributes_2 = false;
483
484 for actual_attributes in &actual_attribute_sets {
485 if actual_attributes == &expected_attributes_1 {
486 found_attributes_1 = true;
487 } else if actual_attributes == &expected_attributes_2 {
488 found_attributes_2 = true;
489 }
490 }
491
492 assert!(
493 found_attributes_1,
494 "Should find data point with attributes: {expected_attributes_1:?}"
495 );
496 assert!(
497 found_attributes_2,
498 "Should find data point with attributes: {expected_attributes_2:?}"
499 );
500
501 println!("Success!");
502 }
503}