opentelemetry_user_events_logs/
lib.rs1#![warn(missing_debug_implementations, missing_docs)]
132
133mod logs;
134
135pub use logs::Processor;
136pub use logs::ProcessorBuilder;
137
138#[cfg(feature = "experimental_eventname_callback")]
139pub use logs::EventNameCallback;
140
141#[cfg(test)]
142mod tests {
143
144 #[cfg(feature = "experimental_eventname_callback")]
145 use crate::EventNameCallback;
146 use crate::Processor;
147 use opentelemetry::trace::Tracer;
148 use opentelemetry::trace::{TraceContextExt, TracerProvider};
149 use opentelemetry::{Key, KeyValue};
150 use opentelemetry_appender_tracing::layer;
151 use opentelemetry_sdk::Resource;
152 use opentelemetry_sdk::{
153 logs::LoggerProviderBuilder,
154 trace::{Sampler, SdkTracerProvider},
155 };
156 use serde_json::{from_str, Value};
157 use std::process::Command;
158 use tracing::error;
159 use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer};
160
161 #[ignore]
165 #[test]
166 fn integration_test_basic() {
167 check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
172 let user_event_processor = Processor::builder("myprovider")
173 .with_resource_attributes(vec!["resource_attribute1", "resource_attribute2"])
174 .build()
175 .unwrap();
176
177 let logger_provider = LoggerProviderBuilder::default()
178 .with_resource(
179 Resource::builder()
180 .with_service_name("myrolename")
181 .with_attribute(KeyValue::new("resource_attribute1", "v1"))
182 .with_attribute(KeyValue::new("resource_attribute2", "v2"))
183 .with_attribute(KeyValue::new("resource_attribute3", "v3"))
184 .build(),
185 )
186 .with_log_processor(user_event_processor)
187 .build();
188
189 let user_event_status = check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
193 assert!(user_event_status.contains("myprovider_L1K1"));
194 assert!(user_event_status.contains("myprovider_L2K1"));
195 assert!(user_event_status.contains("myprovider_L3K1"));
196 assert!(user_event_status.contains("myprovider_L4K1"));
197 assert!(user_event_status.contains("myprovider_L5K1"));
198
199 let filter_otel =
200 EnvFilter::new("info").add_directive("opentelemetry=off".parse().unwrap());
201 let otel_layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
202 let otel_layer = otel_layer.with_filter(filter_otel);
203 let subscriber = tracing_subscriber::registry().with(otel_layer);
204 let _guard = tracing::subscriber::set_default(subscriber);
205
206 let perf_thread =
208 std::thread::spawn(|| run_perf_and_decode(5, "user_events:myprovider_L2K1"));
209
210 std::thread::sleep(std::time::Duration::from_millis(1000));
212
213 error!(
215 name: "my-event-name",
216 target: "my-target",
217 event_id = 20,
218 bool_field = true,
219 double_field = 1.0,
220 user_name = "otel user",
221 user_email = "otel.user@opentelemetry.com",
222 message = "This is a test message",
223 );
224
225 let result = perf_thread.join().expect("Perf thread panicked");
227
228 assert!(result.is_ok());
229 let json_content = result.unwrap();
230 assert!(!json_content.is_empty());
231
232 let formatted_output = json_content.trim().to_string();
233 let json_value: Value = from_str(&formatted_output).expect("Failed to parse JSON");
242 let perf_data_key = json_value
243 .as_object()
244 .expect("JSON is not an object")
245 .keys()
246 .find(|k| k.contains("perf.data"))
247 .expect("No perf.data key found in JSON");
248
249 let events = json_value[perf_data_key]
250 .as_array()
251 .expect("Events for perf.data is not an array");
252
253 let event = events
255 .iter()
256 .find(|e| {
257 if let Some(name) = e.get("n") {
258 name.as_str().unwrap_or("") == "myprovider:Log"
259 } else {
260 false
261 }
262 })
263 .expect("Event 'myprovider:Log' not found");
264
265 assert_eq!(event["n"].as_str().unwrap(), "myprovider:Log");
267 assert_eq!(event["__csver__"].as_i64().unwrap(), 1024);
268
269 let part_a = &event["PartA"];
271 assert!(part_a.get("time").is_some(), "PartA.time is missing");
273
274 let role = part_a
275 .get("ext_cloud_role")
276 .expect("PartA.ext_cloud_role is missing");
277 assert_eq!(role.as_str().unwrap(), "myrolename");
278
279 let part_b = &event["PartB"];
281 assert_eq!(part_b["_typeName"].as_str().unwrap(), "Log");
282 assert_eq!(part_b["severityNumber"].as_i64().unwrap(), 17);
283 assert_eq!(part_b["severityText"].as_str().unwrap(), "ERROR");
284 assert_eq!(part_b["eventId"].as_i64().unwrap(), 20);
285 assert_eq!(part_b["name"].as_str().unwrap(), "my-event-name");
286 assert_eq!(part_b["body"].as_str().unwrap(), "This is a test message");
287
288 let part_c = &event["PartC"];
290 assert_eq!(part_c["user_name"].as_str().unwrap(), "otel user");
291 assert_eq!(part_c["resource_attribute1"].as_str().unwrap(), "v1");
292 assert_eq!(part_c["resource_attribute2"].as_str().unwrap(), "v2");
293 assert!(
294 part_c.get("resource_attribute3").is_none(),
295 "resource_attribute3 should not be present"
296 );
297 assert_eq!(
298 part_c["user_email"].as_str().unwrap(),
299 "otel.user@opentelemetry.com"
300 );
301 assert!(part_c["bool_field"].as_bool().unwrap());
302 assert_eq!(part_c["double_field"].as_f64().unwrap(), 1.0);
303 }
304
305 #[ignore]
306 #[test]
307 #[cfg(feature = "experimental_eventname_callback")]
308 fn integration_test_callback_event_name() {
309 #[derive(Debug, Clone)]
313 struct FixedEventNameCallback;
314
315 impl EventNameCallback for FixedEventNameCallback {
316 fn get_name(&self, _: &opentelemetry_sdk::logs::SdkLogRecord) -> &'static str {
317 "MyEventName"
318 }
319 }
320
321 integration_test_callback_event_name_helper(
322 FixedEventNameCallback,
323 "myprovider:MyEventName",
324 );
325 }
326
327 #[ignore]
328 #[test]
329 #[cfg(feature = "experimental_eventname_callback")]
330 fn integration_test_callback_event_name_from_logrecord() {
331 #[derive(Debug, Clone)]
335 struct TestEventNameCallback;
336
337 impl EventNameCallback for TestEventNameCallback {
338 fn get_name(&self, log_record: &opentelemetry_sdk::logs::SdkLogRecord) -> &'static str {
339 log_record.event_name().unwrap_or("MyEventName")
340 }
341 }
342
343 integration_test_callback_event_name_helper(
344 TestEventNameCallback,
345 "myprovider:my-event-name",
346 );
347 }
348
349 #[cfg(feature = "experimental_eventname_callback")]
350 fn integration_test_callback_event_name_helper<C>(
351 event_name_callback: C,
352 expected_event_name: &'static str,
353 ) where
354 C: EventNameCallback + 'static,
355 {
356 check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
358 let user_event_processor = Processor::builder("myprovider")
359 .with_resource_attributes(vec!["resource_attribute1"])
360 .with_event_name_callback(event_name_callback)
361 .build()
362 .unwrap();
363
364 let logger_provider = LoggerProviderBuilder::default()
365 .with_resource(
366 Resource::builder()
367 .with_service_name("myrolename")
368 .with_attribute(KeyValue::new("resource_attribute1", "v1"))
369 .with_attribute(KeyValue::new("resource_attribute2", "v2"))
370 .build(),
371 )
372 .with_log_processor(user_event_processor)
373 .build();
374
375 let user_event_status = check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
379 assert!(user_event_status.contains("myprovider_L1K1"));
380 assert!(user_event_status.contains("myprovider_L2K1"));
381 assert!(user_event_status.contains("myprovider_L3K1"));
382 assert!(user_event_status.contains("myprovider_L4K1"));
383 assert!(user_event_status.contains("myprovider_L5K1"));
384
385 let filter_otel =
386 EnvFilter::new("info").add_directive("opentelemetry=off".parse().unwrap());
387 let otel_layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
388 let otel_layer = otel_layer.with_filter(filter_otel);
389 let subscriber = tracing_subscriber::registry().with(otel_layer);
390 let _guard = tracing::subscriber::set_default(subscriber);
391
392 let perf_thread =
394 std::thread::spawn(|| run_perf_and_decode(5, "user_events:myprovider_L2K1"));
395
396 std::thread::sleep(std::time::Duration::from_millis(1000));
398
399 error!(
401 name: "my-event-name",
402 target: "my-target",
403 event_id = 20,
404 bool_field = true,
405 double_field = 1.0,
406 user_name = "otel user",
407 user_email = "otel.user@opentelemetry.com",
408 message = "This is a test message",
409 );
410
411 let result = perf_thread.join().expect("Perf thread panicked");
413
414 assert!(result.is_ok());
415 let json_content = result.unwrap();
416 assert!(!json_content.is_empty());
417
418 let formatted_output = json_content.trim().to_string();
419 let json_value: Value = from_str(&formatted_output).expect("Failed to parse JSON");
428 let perf_data_key = json_value
429 .as_object()
430 .expect("JSON is not an object")
431 .keys()
432 .find(|k| k.contains("perf.data"))
433 .expect("No perf.data key found in JSON");
434
435 let events = json_value[perf_data_key]
436 .as_array()
437 .expect("Events for perf.data is not an array");
438
439 let event = events
441 .iter()
442 .find(|e| {
443 if let Some(name) = e.get("n") {
444 name.as_str().unwrap_or("") == expected_event_name
445 } else {
446 false
447 }
448 })
449 .unwrap_or_else(|| panic!("Event '{expected_event_name}' not found"));
450
451 assert_eq!(event["__csver__"].as_i64().unwrap(), 1024);
453
454 let part_a = &event["PartA"];
456 assert!(part_a.get("time").is_some(), "PartA.time is missing");
458
459 let role = part_a
460 .get("ext_cloud_role")
461 .expect("PartA.ext_cloud_role is missing");
462 assert_eq!(role.as_str().unwrap(), "myrolename");
463
464 let part_b = &event["PartB"];
466 assert_eq!(part_b["_typeName"].as_str().unwrap(), "Log");
467 assert_eq!(part_b["severityNumber"].as_i64().unwrap(), 17);
468 assert_eq!(part_b["severityText"].as_str().unwrap(), "ERROR");
469 assert_eq!(part_b["eventId"].as_i64().unwrap(), 20);
470 assert_eq!(part_b["name"].as_str().unwrap(), "my-event-name");
471 assert_eq!(part_b["body"].as_str().unwrap(), "This is a test message");
472
473 let part_c = &event["PartC"];
475 assert_eq!(part_c["user_name"].as_str().unwrap(), "otel user");
476 assert_eq!(part_c["resource_attribute1"].as_str().unwrap(), "v1");
477 assert!(
478 part_c.get("resource_attribute3").is_none(),
479 "resource_attribute3 should not be present"
480 );
481 assert_eq!(
482 part_c["user_email"].as_str().unwrap(),
483 "otel.user@opentelemetry.com"
484 );
485 assert!(part_c["bool_field"].as_bool().unwrap());
486 assert_eq!(part_c["double_field"].as_f64().unwrap(), 1.0);
487 }
488
489 #[ignore]
490 #[test]
491 fn integration_test_with_tracing() {
492 check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
497
498 let tracer_provider = SdkTracerProvider::builder()
500 .with_sampler(Sampler::AlwaysOn)
501 .build();
502 let tracer = tracer_provider.tracer("test-tracer");
503
504 let user_event_processor = Processor::builder("myprovider").build().unwrap();
505 let logger_provider = LoggerProviderBuilder::default()
506 .with_log_processor(user_event_processor)
507 .build();
508
509 let user_event_status = check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
513 assert!(user_event_status.contains("myprovider_L1K1"));
514 assert!(user_event_status.contains("myprovider_L2K1"));
515 assert!(user_event_status.contains("myprovider_L3K1"));
516 assert!(user_event_status.contains("myprovider_L4K1"));
517 assert!(user_event_status.contains("myprovider_L5K1"));
518
519 let filter_otel =
520 EnvFilter::new("info").add_directive("opentelemetry=off".parse().unwrap());
521 let otel_layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
522 let otel_layer = otel_layer.with_filter(filter_otel);
523 let subscriber = tracing_subscriber::registry().with(otel_layer);
524 let _guard = tracing::subscriber::set_default(subscriber);
525
526 let perf_thread =
528 std::thread::spawn(|| run_perf_and_decode(5, "user_events:myprovider_L2K1"));
529
530 std::thread::sleep(std::time::Duration::from_millis(1000));
532
533 let (trace_id_expected, span_id_expected) = tracer.in_span("test-span", |cx| {
535 let trace_id = cx.span().span_context().trace_id();
536 let span_id = cx.span().span_context().span_id();
537
538 error!(
540 name: "my-event-name",
541 target: "my-target",
542 event_id = 20,
543 user_name = "otel user",
544 user_email = "otel.user@opentelemetry.com"
545 );
546 (trace_id, span_id)
547 });
548
549 let result = perf_thread.join().expect("Perf thread panicked");
551
552 assert!(result.is_ok());
553 let json_content = result.unwrap();
554 assert!(!json_content.is_empty());
555
556 let formatted_output = json_content.trim().to_string();
557 let json_value: Value = from_str(&formatted_output).expect("Failed to parse JSON");
566 let perf_data_key = json_value
567 .as_object()
568 .expect("JSON is not an object")
569 .keys()
570 .find(|k| k.contains("perf.data"))
571 .expect("No perf.data key found in JSON");
572
573 let events = json_value[perf_data_key]
574 .as_array()
575 .expect("Events for perf.data is not an array");
576
577 let event = events
579 .iter()
580 .find(|e| {
581 if let Some(name) = e.get("n") {
582 name.as_str().unwrap_or("") == "myprovider:Log"
583 } else {
584 false
585 }
586 })
587 .expect("Event 'myprovider:Log' not found");
588
589 assert_eq!(event["n"].as_str().unwrap(), "myprovider:Log");
591 assert_eq!(event["__csver__"].as_i64().unwrap(), 1024);
592
593 let part_a = &event["PartA"];
595 assert!(part_a.get("time").is_some(), "PartA.time is missing");
597
598 let part_a_ext_dt_trace_id = part_a
599 .get("ext_dt_traceId")
600 .expect("PartA.ext_dt_traceId is missing");
601 let part_a_ext_dt_span_id = part_a
602 .get("ext_dt_spanId")
603 .expect("PartA.ext_dt_spanId is missing");
604
605 assert_eq!(
607 part_a_ext_dt_trace_id.as_str().unwrap(),
608 format!("{trace_id_expected:x}")
609 );
610 assert_eq!(
611 part_a_ext_dt_span_id.as_str().unwrap(),
612 format!("{span_id_expected:x}")
613 );
614
615 let part_b = &event["PartB"];
617 assert_eq!(part_b["_typeName"].as_str().unwrap(), "Log");
618 assert_eq!(part_b["severityNumber"].as_i64().unwrap(), 17);
619 assert_eq!(part_b["severityText"].as_str().unwrap(), "ERROR");
620 assert_eq!(part_b["eventId"].as_i64().unwrap(), 20);
621 assert_eq!(part_b["name"].as_str().unwrap(), "my-event-name");
622
623 let part_c = &event["PartC"];
625 assert_eq!(part_c["user_name"].as_str().unwrap(), "otel user");
626 assert_eq!(
627 part_c["user_email"].as_str().unwrap(),
628 "otel.user@opentelemetry.com"
629 );
630 }
631
632 fn integration_test_direct_helper(severity: opentelemetry::logs::Severity, trace_point: &str) {
635 use opentelemetry::logs::AnyValue;
636 use opentelemetry::logs::LogRecord;
637 use opentelemetry::logs::Logger;
638 use opentelemetry::logs::LoggerProvider;
639
640 check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
642 let user_event_processor = Processor::builder("myprovider").build().unwrap();
643
644 let logger_provider = LoggerProviderBuilder::default()
645 .with_resource(Resource::builder().with_service_name("myrolename").build())
646 .with_log_processor(user_event_processor)
647 .build();
648
649 let logger = logger_provider.logger("test");
650
651 let mut record = logger.create_log_record();
652 record.set_severity_number(severity);
653 record.set_event_name("my-event-name");
654 record.set_target("my-target");
655 record.set_body(AnyValue::from("This is a test message"));
656 record.add_attribute("string_attr", "string value");
659
660 record.add_attribute("int_attr", 42i64);
662
663 record.add_attribute("double_attr", 3.575);
665
666 record.add_attribute("bool_attr", true);
668
669 let bytes_data = vec![1, 2, 3, 4, 5];
671 record.add_attribute("bytes_attr", AnyValue::Bytes(Box::new(bytes_data)));
672
673 let list_values = vec![AnyValue::Int(1), AnyValue::Int(2), AnyValue::Int(3)];
675 record.add_attribute("list_attr", AnyValue::ListAny(Box::new(list_values)));
676
677 let mut map_values = std::collections::HashMap::new();
679 map_values.insert(Key::new("key1"), AnyValue::String("value1".into()));
680 map_values.insert(Key::new("key2"), AnyValue::Int(42));
681 record.add_attribute("map_attr", AnyValue::Map(Box::new(map_values)));
682
683 let user_event_status = check_user_events_available().expect("Kernel does not support user_events. Verify your distribution/kernel supports user_events: https://docs.kernel.org/trace/user_events.html.");
687 assert!(user_event_status.contains("myprovider_L1K1"));
688 assert!(user_event_status.contains("myprovider_L2K1"));
689 assert!(user_event_status.contains("myprovider_L3K1"));
690 assert!(user_event_status.contains("myprovider_L4K1"));
691 assert!(user_event_status.contains("myprovider_L5K1"));
692
693 let trace_point_clone = trace_point.to_string();
695 let perf_thread =
696 std::thread::spawn(move || run_perf_and_decode(5, trace_point_clone.as_ref()));
697
698 std::thread::sleep(std::time::Duration::from_millis(1000));
700
701 logger.emit(record);
703
704 let result = perf_thread.join().expect("Perf thread panicked");
706
707 assert!(result.is_ok());
708 let json_content = result.unwrap();
709 assert!(!json_content.is_empty());
710
711 let formatted_output = json_content.trim().to_string();
712 let json_value: Value = from_str(&formatted_output).expect("Failed to parse JSON");
721 let perf_data_key = json_value
722 .as_object()
723 .expect("JSON is not an object")
724 .keys()
725 .find(|k| k.contains("perf.data"))
726 .expect("No perf.data key found in JSON");
727
728 let events = json_value[perf_data_key]
729 .as_array()
730 .expect("Events for perf.data is not an array");
731
732 let event = events
734 .iter()
735 .find(|e| {
736 if let Some(name) = e.get("n") {
737 name.as_str().unwrap_or("") == "myprovider:Log"
738 } else {
739 false
740 }
741 })
742 .expect("Event 'myprovider:Log' not found");
743
744 assert_eq!(event["n"].as_str().unwrap(), "myprovider:Log");
746 assert_eq!(event["__csver__"].as_i64().unwrap(), 1024);
747
748 let part_a = &event["PartA"];
750 assert!(part_a.get("time").is_some(), "PartA.time is missing");
752
753 let role = part_a
754 .get("ext_cloud_role")
755 .expect("PartA.ext_cloud_role is missing");
756 assert_eq!(role.as_str().unwrap(), "myrolename");
757
758 let part_b = &event["PartB"];
760 assert_eq!(part_b["_typeName"].as_str().unwrap(), "Log");
761 assert_eq!(part_b["severityNumber"].as_i64().unwrap(), severity as i64);
762 assert_eq!(part_b["name"].as_str().unwrap(), "my-event-name");
763 assert_eq!(part_b["body"].as_str().unwrap(), "This is a test message");
764
765 let part_c = &event["PartC"];
767 assert_eq!(part_c["string_attr"].as_str().unwrap(), "string value");
768 assert_eq!(part_c["int_attr"].as_i64().unwrap(), 42i64);
769 assert_eq!(part_c["double_attr"].as_f64().unwrap(), 3.575);
770 assert!(part_c["bool_attr"].as_bool().unwrap());
771
772 assert_eq!(part_c["bytes_attr"].as_str().unwrap(), "");
775 assert_eq!(part_c["list_attr"].as_str().unwrap(), "");
776 assert_eq!(part_c["map_attr"].as_str().unwrap(), "");
777 }
778
779 #[ignore]
780 #[test]
781 fn integration_test_direct() {
782 use opentelemetry::logs::Severity;
783 integration_test_direct_helper(Severity::Debug, "user_events:myprovider_L5K1");
786 integration_test_direct_helper(Severity::Info, "user_events:myprovider_L4K1");
787 integration_test_direct_helper(Severity::Warn, "user_events:myprovider_L3K1");
788 integration_test_direct_helper(Severity::Error, "user_events:myprovider_L2K1");
789 integration_test_direct_helper(Severity::Fatal, "user_events:myprovider_L1K1");
790 }
791
792 fn check_user_events_available() -> Result<String, String> {
793 let output = Command::new("sudo")
794 .arg("cat")
795 .arg("/sys/kernel/tracing/user_events_status")
796 .output()
797 .map_err(|e| format!("Failed to execute command: {e}"))?;
798
799 if output.status.success() {
800 let status = String::from_utf8_lossy(&output.stdout);
801 Ok(status.to_string())
802 } else {
803 Err(format!(
804 "Command executed with failing error code: {}",
805 String::from_utf8_lossy(&output.stderr)
806 ))
807 }
808 }
809
810 pub fn run_perf_and_decode(duration_secs: u64, event: &str) -> std::io::Result<String> {
811 let perf_status = Command::new("sudo")
813 .args([
814 "timeout",
815 "-s",
816 "SIGINT",
817 &duration_secs.to_string(),
818 "perf",
819 "record",
820 "-e",
821 event,
822 ])
823 .status()?;
824
825 if !perf_status.success() {
826 if !matches!(perf_status.code(), Some(124) | Some(130) | Some(143)) {
829 panic!(
830 "perf record failed with exit code: {:?}",
831 perf_status.code()
832 );
833 }
834 }
835
836 let chmod_status = Command::new("sudo")
838 .args(["chmod", "uog+r", "./perf.data"])
839 .status()?;
840
841 if !chmod_status.success() {
842 panic!("chmod failed with exit code: {:?}", chmod_status.code());
843 }
844
845 let decode_output = Command::new("perf-decode").args(["./perf.data"]).output()?;
851
852 if !decode_output.status.success() {
853 panic!(
854 "perf-decode failed with exit code: {:?}",
855 decode_output.status.code()
856 );
857 }
858
859 let raw_output = String::from_utf8_lossy(&decode_output.stdout).to_string();
861
862 let cleaned_output = if let Some(stripped) = raw_output.strip_prefix('\u{FEFF}') {
865 stripped.to_string()
867 } else {
868 raw_output
869 };
870
871 let trimmed_output = cleaned_output.trim().to_string();
873
874 Ok(trimmed_output)
875 }
876}