1use async_trait::async_trait;
118use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
119use bon::bon;
120use flate2::{write::GzEncoder, Compression};
121use futures_util::future::BoxFuture;
122use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
123use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
124use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
125use opentelemetry_sdk::resource::Resource;
126use opentelemetry_sdk::{
127 error::OTelSdkError,
128 trace::{SpanData, SpanExporter},
129};
130use prost::Message;
131use serde::{Deserialize, Serialize};
132use std::{
133 collections::HashMap,
134 env,
135 fmt::Display,
136 fs::OpenOptions,
137 io::{self, Write},
138 path::PathBuf,
139 result::Result,
140 str::FromStr,
141 sync::Arc,
142};
143
144mod constants;
145use constants::{defaults, env_vars};
146
147pub mod consts {
149 pub use crate::constants::defaults;
155 pub use crate::constants::env_vars;
156 pub use crate::constants::resource_attributes;
157}
158
159const VERSION: &str = env!("CARGO_PKG_VERSION");
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
163pub enum LogLevel {
164 Debug,
166 #[default]
168 Info,
169 Warn,
171 Error,
173}
174
175impl FromStr for LogLevel {
176 type Err = String;
177
178 fn from_str(s: &str) -> Result<Self, Self::Err> {
179 match s.to_lowercase().as_str() {
180 "debug" => Ok(LogLevel::Debug),
181 "info" => Ok(LogLevel::Info),
182 "warn" | "warning" => Ok(LogLevel::Warn),
183 "error" => Ok(LogLevel::Error),
184 _ => Err(format!("Invalid log level: {}", s)),
185 }
186 }
187}
188
189impl Display for LogLevel {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 match self {
192 LogLevel::Debug => write!(f, "DEBUG"),
193 LogLevel::Info => write!(f, "INFO"),
194 LogLevel::Warn => write!(f, "WARN"),
195 LogLevel::Error => write!(f, "ERROR"),
196 }
197 }
198}
199
200trait Output: Send + Sync + std::fmt::Debug + std::any::Any {
205 fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
215}
216
217#[derive(Debug, Default)]
219struct StdOutput;
220
221impl Output for StdOutput {
222 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
223 let stdout = io::stdout();
225 let mut handle = stdout.lock();
226
227 writeln!(handle, "{}", line).map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
229
230 Ok(())
231 }
232}
233
234#[derive(Debug)]
236struct NamedPipeOutput {
237 path: PathBuf,
238}
239
240impl NamedPipeOutput {
241 fn new() -> Result<Self, OTelSdkError> {
242 let path_buf = PathBuf::from(defaults::PIPE_PATH);
243 if !path_buf.exists() {
244 log::warn!("Named pipe does not exist: {}", defaults::PIPE_PATH);
245 }
247
248 Ok(Self { path: path_buf })
249 }
250}
251
252impl Output for NamedPipeOutput {
253 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
254 let mut file = OpenOptions::new()
256 .write(true)
257 .open(&self.path)
258 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to open pipe: {}", e)))?;
259
260 writeln!(file, "{}", line).map_err(|e| {
262 OTelSdkError::InternalFailure(format!("Failed to write to pipe: {}", e))
263 })?;
264
265 Ok(())
266 }
267}
268
269fn create_output(use_pipe: bool) -> Arc<dyn Output> {
271 if use_pipe {
272 match NamedPipeOutput::new() {
273 Ok(output) => Arc::new(output),
274 Err(e) => {
275 log::warn!(
276 "Failed to create named pipe output: {}, falling back to stdout",
277 e
278 );
279 Arc::new(StdOutput)
280 }
281 }
282 } else {
283 Arc::new(StdOutput)
284 }
285}
286
287#[derive(Debug, Serialize, Deserialize)]
292pub struct ExporterOutput {
293 #[serde(rename = "__otel_otlp_stdout")]
295 pub version: String,
296 pub source: String,
298 pub endpoint: String,
300 pub method: String,
302 #[serde(rename = "content-type")]
304 pub content_type: String,
305 #[serde(rename = "content-encoding")]
307 pub content_encoding: String,
308 #[serde(skip_serializing_if = "ExporterOutput::is_headers_empty")]
310 pub headers: Option<HashMap<String, String>>,
311 pub payload: String,
313 pub base64: bool,
315 #[serde(skip_serializing_if = "Option::is_none")]
317 pub level: Option<String>,
318}
319
320impl ExporterOutput {
321 fn is_headers_empty(headers: &Option<HashMap<String, String>>) -> bool {
323 headers.as_ref().map_or(true, |h| h.is_empty())
324 }
325}
326
327#[derive(Debug)]
351pub struct OtlpStdoutSpanExporter {
352 compression_level: u8,
354 resource: Option<Resource>,
356 headers: Option<HashMap<String, String>>,
358 output: Arc<dyn Output>,
360 level: Option<LogLevel>,
362}
363
364impl Default for OtlpStdoutSpanExporter {
365 fn default() -> Self {
366 Self::builder().build()
367 }
368}
369#[bon]
370impl OtlpStdoutSpanExporter {
371 #[builder]
391 pub fn new(
392 compression_level: Option<u8>,
393 resource: Option<Resource>,
394 headers: Option<HashMap<String, String>>,
395 output: Option<Arc<dyn Output>>,
396 level: Option<LogLevel>,
397 pipe: Option<bool>,
398 ) -> Self {
399 let compression_level = match env::var(env_vars::COMPRESSION_LEVEL) {
401 Ok(value) => match value.parse::<u8>() {
402 Ok(level) if level <= 9 => level,
403 Ok(level) => {
404 log::warn!(
405 "Invalid value in {}: {} (must be 0-9), using fallback",
406 env_vars::COMPRESSION_LEVEL,
407 level
408 );
409 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
410 }
411 Err(_) => {
412 log::warn!(
413 "Failed to parse {}: {}, using fallback",
414 env_vars::COMPRESSION_LEVEL,
415 value
416 );
417 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
418 }
419 },
420 Err(_) => {
421 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
423 }
424 };
425
426 let headers = match headers {
428 Some(constructor_headers) => {
429 if let Some(env_headers) = Self::parse_headers() {
430 let mut merged = constructor_headers;
432 merged.extend(env_headers);
433 Some(merged)
434 } else {
435 Some(constructor_headers)
437 }
438 }
439 None => Self::parse_headers(), };
441
442 let level = match env::var(env_vars::LOG_LEVEL) {
444 Ok(value) => match LogLevel::from_str(&value) {
445 Ok(log_level) => Some(log_level),
446 Err(e) => {
447 log::warn!(
448 "Invalid log level in {}: {}, using fallback",
449 env_vars::LOG_LEVEL,
450 e
451 );
452 level
453 }
454 },
455 Err(_) => {
456 level
458 }
459 };
460
461 let use_pipe = match env::var(env_vars::OUTPUT_TYPE) {
463 Ok(value) => value.to_lowercase() == "pipe",
464 Err(_) => pipe.unwrap_or(false),
465 };
466
467 let output = output.unwrap_or_else(|| create_output(use_pipe));
469
470 Self {
471 compression_level,
472 resource,
473 headers,
474 output,
475 level,
476 }
477 }
478
479 fn get_service_name() -> String {
487 env::var(env_vars::SERVICE_NAME)
488 .or_else(|_| env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME))
489 .unwrap_or_else(|_| defaults::SERVICE_NAME.to_string())
490 }
491
492 #[cfg(test)]
493 fn with_test_output() -> (Self, Arc<TestOutput>) {
494 let output = Arc::new(TestOutput::new());
495
496 let exporter = Self::builder().output(output.clone()).build();
498
499 (exporter, output)
500 }
501
502 fn parse_headers() -> Option<HashMap<String, String>> {
507 let get_headers = |var_name: &str| -> Option<HashMap<String, String>> {
509 env::var(var_name).ok().map(|header_str| {
510 let mut map = HashMap::new();
511 Self::parse_header_string(&header_str, &mut map);
512 map
513 })
514 };
515
516 let global_headers = get_headers("OTEL_EXPORTER_OTLP_HEADERS");
518 let trace_headers = get_headers("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
519
520 if global_headers.is_none() && trace_headers.is_none() {
522 return None;
523 }
524
525 let mut result = HashMap::new();
527
528 if let Some(headers) = global_headers {
530 result.extend(headers);
531 }
532
533 if let Some(headers) = trace_headers {
535 result.extend(headers);
536 }
537
538 if result.is_empty() {
540 None
541 } else {
542 Some(result)
543 }
544 }
545
546 fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
553 for pair in header_str.split(',') {
554 if let Some((key, value)) = pair.split_once('=') {
555 let key = key.trim().to_lowercase();
556 if key != "content-type" && key != "content-encoding" {
558 headers.insert(key, value.trim().to_string());
559 }
560 }
561 }
562 }
563}
564
565#[async_trait]
566impl SpanExporter for OtlpStdoutSpanExporter {
567 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, Result<(), OTelSdkError>> {
584 let result = (|| {
586 let resource = self
588 .resource
589 .clone()
590 .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
591 let resource_attrs = ResourceAttributesWithSchema::from(&resource);
592 let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
593 let request = ExportTraceServiceRequest { resource_spans };
594
595 let proto_bytes = request.encode_to_vec();
597
598 let mut encoder =
600 GzEncoder::new(Vec::new(), Compression::new(self.compression_level as u32));
601 encoder
602 .write_all(&proto_bytes)
603 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
604 let compressed_bytes = encoder
605 .finish()
606 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
607
608 let payload = base64_engine.encode(compressed_bytes);
610
611 let output_data = ExporterOutput {
613 version: VERSION.to_string(),
614 source: Self::get_service_name(),
615 endpoint: defaults::ENDPOINT.to_string(),
616 method: "POST".to_string(),
617 content_type: "application/x-protobuf".to_string(),
618 content_encoding: "gzip".to_string(),
619 headers: self.headers.clone(),
620 payload,
621 base64: true,
622 level: self.level.map(|l| l.to_string()),
623 };
624
625 self.output.write_line(
627 &serde_json::to_string(&output_data)
628 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
629 )?;
630
631 Ok(())
632 })();
633
634 Box::pin(std::future::ready(result))
636 }
637
638 fn shutdown(&mut self) -> Result<(), OTelSdkError> {
646 Ok(())
647 }
648
649 fn force_flush(&mut self) -> Result<(), OTelSdkError> {
657 Ok(())
658 }
659
660 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
670 self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
671 resource.clone(),
672 ));
673 }
674}
675
676#[cfg(doctest)]
677#[macro_use]
678extern crate doc_comment;
679
680#[cfg(doctest)]
681use doc_comment::doctest;
682
683#[cfg(doctest)]
684doctest!("../README.md", readme);
685
686#[cfg(test)]
687use std::sync::Mutex;
688
689#[cfg(test)]
691#[derive(Debug, Default)]
692struct TestOutput {
693 buffer: Arc<Mutex<Vec<String>>>,
694}
695
696#[cfg(test)]
697impl TestOutput {
698 fn new() -> Self {
699 Self {
700 buffer: Arc::new(Mutex::new(Vec::new())),
701 }
702 }
703
704 fn get_output(&self) -> Vec<String> {
705 self.buffer.lock().unwrap().clone()
706 }
707}
708
709#[cfg(test)]
710impl Output for TestOutput {
711 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
712 self.buffer.lock().unwrap().push(line.to_string());
713 Ok(())
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720 use opentelemetry::{
721 trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
722 InstrumentationScope, KeyValue,
723 };
724 use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
725 use serde_json::Value;
726 use serial_test::serial;
727 use std::time::SystemTime;
728
729 fn create_test_span() -> SpanData {
730 let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
731 let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
732 let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
733
734 let span_context = SpanContext::new(
735 TraceId::from_bytes(trace_id_bytes),
736 SpanId::from_bytes(span_id_bytes),
737 TraceFlags::default(),
738 false,
739 TraceState::default(),
740 );
741
742 SpanData {
743 span_context,
744 parent_span_id: SpanId::from_bytes(parent_id_bytes),
745 span_kind: SpanKind::Client,
746 name: "test-span".into(),
747 start_time: SystemTime::UNIX_EPOCH,
748 end_time: SystemTime::UNIX_EPOCH,
749 attributes: vec![KeyValue::new("test.key", "test-value")],
750 dropped_attributes_count: 0,
751 events: SpanEvents::default(),
752 links: SpanLinks::default(),
753 status: Status::Ok,
754 instrumentation_scope: InstrumentationScope::builder("test-library")
755 .with_version("1.0.0")
756 .with_schema_url("https://opentelemetry.io/schema/1.0.0")
757 .build(),
758 }
759 }
760
761 #[test]
762 fn test_parse_headers() {
763 std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
764 std::env::set_var(
765 "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
766 "key2=override,key3=value3",
767 );
768
769 let headers = OtlpStdoutSpanExporter::parse_headers();
770
771 assert!(headers.is_some());
773 let headers = headers.unwrap();
774
775 assert_eq!(headers.get("key1").unwrap(), "value1");
776 assert_eq!(headers.get("key2").unwrap(), "override");
777 assert_eq!(headers.get("key3").unwrap(), "value3");
778
779 std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
781 std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
782 }
783
784 #[test]
785 fn test_service_name_resolution() {
786 std::env::set_var(env_vars::SERVICE_NAME, "otel-service");
788 std::env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "lambda-function");
789 assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
790
791 std::env::remove_var(env_vars::SERVICE_NAME);
793 assert_eq!(
794 OtlpStdoutSpanExporter::get_service_name(),
795 "lambda-function"
796 );
797
798 std::env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
800 assert_eq!(
801 OtlpStdoutSpanExporter::get_service_name(),
802 defaults::SERVICE_NAME
803 );
804 }
805
806 #[test]
807 fn test_compression_level_precedence() {
808 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
810 let exporter = OtlpStdoutSpanExporter::builder()
811 .compression_level(7)
812 .build();
813 assert_eq!(exporter.compression_level, 3);
814
815 std::env::set_var(env_vars::COMPRESSION_LEVEL, "invalid");
817 let exporter = OtlpStdoutSpanExporter::builder()
818 .compression_level(7)
819 .build();
820 assert_eq!(exporter.compression_level, 7);
821
822 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
824 let exporter = OtlpStdoutSpanExporter::builder()
825 .compression_level(7)
826 .build();
827 assert_eq!(exporter.compression_level, 7);
828
829 let exporter = OtlpStdoutSpanExporter::builder()
831 .compression_level(defaults::COMPRESSION_LEVEL)
832 .build();
833 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
834 }
835
836 #[test]
837 fn test_new_uses_env_compression_level() {
838 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
840 let exporter = OtlpStdoutSpanExporter::default();
841 assert_eq!(exporter.compression_level, 3);
842
843 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
845 let exporter = OtlpStdoutSpanExporter::default();
846 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
847 }
848
849 #[tokio::test]
850 #[serial]
851 async fn test_compression_level_affects_output_size() {
852 let mut spans = Vec::new();
854 for i in 0..100 {
855 let mut span = create_test_span();
856 span.attributes.push(KeyValue::new("index", i));
858 span.attributes
860 .push(KeyValue::new("data", "a".repeat(1000)));
861 spans.push(span);
862 }
863
864 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
866
867 let no_compression_output = Arc::new(TestOutput::new());
869 let mut no_compression_exporter = OtlpStdoutSpanExporter {
870 compression_level: 0,
871 resource: None,
872 output: no_compression_output.clone() as Arc<dyn Output>,
873 headers: None,
874 level: None,
875 };
876 let _ = no_compression_exporter.export(spans.clone()).await;
877 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
878
879 let max_compression_output = Arc::new(TestOutput::new());
881 let mut max_compression_exporter = OtlpStdoutSpanExporter {
882 compression_level: 9,
883 resource: None,
884 output: max_compression_output.clone() as Arc<dyn Output>,
885 headers: None,
886 level: None,
887 };
888 let _ = max_compression_exporter.export(spans.clone()).await;
889 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
890
891 assert!(no_compression_size > max_compression_size,
893 "Maximum compression (level 9) should produce output no larger than no compression (level 0). Got {} vs {}",
894 max_compression_size, no_compression_size);
895
896 let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
898 let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
899
900 assert_eq!(
901 no_compression_spans,
902 spans.len(),
903 "No compression output should contain all spans"
904 );
905 assert_eq!(
906 max_compression_spans,
907 spans.len(),
908 "Maximum compression output should contain all spans"
909 );
910 }
911
912 fn extract_payload_size(json_str: &str) -> usize {
914 let json: Value = serde_json::from_str(json_str).unwrap();
915 let payload = json["payload"].as_str().unwrap();
916 base64_engine.decode(payload).unwrap().len()
917 }
918
919 fn decode_and_count_spans(json_str: &str) -> usize {
921 let json: Value = serde_json::from_str(json_str).unwrap();
922 let payload = json["payload"].as_str().unwrap();
923 let decoded = base64_engine.decode(payload).unwrap();
924
925 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
926 let mut decompressed = Vec::new();
927 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
928
929 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
930
931 let mut span_count = 0;
933 for resource_span in &request.resource_spans {
934 for scope_span in &resource_span.scope_spans {
935 span_count += scope_span.spans.len();
936 }
937 }
938
939 span_count
940 }
941
942 #[tokio::test]
943 async fn test_export_single_span() {
944 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
945 let span = create_test_span();
946
947 let result = exporter.export(vec![span]).await;
948 assert!(result.is_ok());
949
950 let output = output.get_output();
951 assert_eq!(output.len(), 1);
952
953 let json: Value = serde_json::from_str(&output[0]).unwrap();
955 assert_eq!(json["__otel_otlp_stdout"], VERSION);
956 assert_eq!(json["method"], "POST");
957 assert_eq!(json["content-type"], "application/x-protobuf");
958 assert_eq!(json["content-encoding"], "gzip");
959 assert_eq!(json["base64"], true);
960
961 let payload = json["payload"].as_str().unwrap();
963 let decoded = base64_engine.decode(payload).unwrap();
964
965 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
967 let mut decompressed = Vec::new();
968 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
969
970 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
972 assert_eq!(request.resource_spans.len(), 1);
973 }
974
975 #[tokio::test]
976 async fn test_export_empty_batch() {
977 let mut exporter = OtlpStdoutSpanExporter::default();
978 let result = exporter.export(vec![]).await;
979 assert!(result.is_ok());
980 }
981
982 #[test]
983 #[serial]
984 fn test_gzip_level_configuration() {
985 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
987
988 let exporter = OtlpStdoutSpanExporter::builder()
990 .compression_level(9)
991 .build();
992 assert_eq!(exporter.compression_level, 9);
993 }
994
995 #[tokio::test]
996 #[serial]
997 async fn test_env_var_affects_export_compression() {
998 let span = create_test_span();
1000 let mut spans = Vec::new();
1001 for i in 0..100 {
1003 let mut span = span.clone();
1004 span.attributes
1006 .push(KeyValue::new(format!("test-key-{}", i), "a".repeat(1000)));
1007 spans.push(span);
1008 }
1009
1010 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1012 let no_compression_output = Arc::new(TestOutput::new());
1013 let mut no_compression_exporter = OtlpStdoutSpanExporter::builder()
1014 .compression_level(0)
1015 .build();
1016 no_compression_exporter.output = no_compression_output.clone() as Arc<dyn Output>;
1017 let _ = no_compression_exporter.export(spans.clone()).await;
1018 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
1019
1020 std::env::set_var(env_vars::COMPRESSION_LEVEL, "9");
1022 let max_compression_output = Arc::new(TestOutput::new());
1023 let mut max_compression_exporter = OtlpStdoutSpanExporter::builder()
1024 .compression_level(9)
1025 .build();
1026 max_compression_exporter.output = max_compression_output.clone() as Arc<dyn Output>;
1027 let _ = max_compression_exporter.export(spans.clone()).await;
1028 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
1029
1030 assert!(no_compression_size > max_compression_size,
1032 "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
1033 max_compression_size, no_compression_size);
1034
1035 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1037 let explicit_output = Arc::new(TestOutput::new());
1038
1039 let mut explicit_exporter = OtlpStdoutSpanExporter::builder()
1041 .output(explicit_output.clone())
1042 .build();
1043
1044 let _ = explicit_exporter.export(spans.clone()).await;
1046 let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
1047
1048 assert!(explicit_size > max_compression_size,
1051 "Environment variable should take precedence over explicitly set level. Expected size closer to {} but got {}",
1052 no_compression_size, explicit_size);
1053
1054 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1056 }
1057
1058 #[tokio::test]
1059 #[serial]
1060 async fn test_environment_variable_precedence() {
1061 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
1063
1064 let exporter = OtlpStdoutSpanExporter::builder()
1067 .compression_level(9)
1068 .build();
1069 assert_eq!(exporter.compression_level, 3);
1070
1071 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1073 let exporter = OtlpStdoutSpanExporter::builder()
1074 .compression_level(9)
1075 .build();
1076 assert_eq!(exporter.compression_level, 9);
1077 }
1078
1079 #[test]
1080 fn test_exporter_output_deserialization() {
1081 let json_str = r#"{
1083 "__otel_otlp_stdout": "0.11.1",
1084 "source": "test-service",
1085 "endpoint": "http://localhost:4318/v1/traces",
1086 "method": "POST",
1087 "content-type": "application/x-protobuf",
1088 "content-encoding": "gzip",
1089 "headers": {
1090 "api-key": "test-key",
1091 "custom-header": "test-value"
1092 },
1093 "payload": "SGVsbG8gd29ybGQ=",
1094 "base64": true
1095 }"#;
1096
1097 let output: ExporterOutput = serde_json::from_str(json_str).unwrap();
1099
1100 assert_eq!(output.version, "0.11.1");
1102 assert_eq!(output.source, "test-service");
1103 assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1104 assert_eq!(output.method, "POST");
1105 assert_eq!(output.content_type, "application/x-protobuf");
1106 assert_eq!(output.content_encoding, "gzip");
1107 assert_eq!(output.headers.as_ref().unwrap().len(), 2);
1108 assert_eq!(
1109 output.headers.as_ref().unwrap().get("api-key").unwrap(),
1110 "test-key"
1111 );
1112 assert_eq!(
1113 output
1114 .headers
1115 .as_ref()
1116 .unwrap()
1117 .get("custom-header")
1118 .unwrap(),
1119 "test-value"
1120 );
1121 assert_eq!(output.payload, "SGVsbG8gd29ybGQ=");
1122 assert!(output.base64);
1123
1124 let decoded = base64_engine.decode(&output.payload).unwrap();
1126 let payload_text = String::from_utf8(decoded).unwrap();
1127 assert_eq!(payload_text, "Hello world");
1128 }
1129
1130 #[test]
1131 fn test_exporter_output_deserialization_dynamic() {
1132 let version = "0.11.1".to_string();
1134 let service = "dynamic-service".to_string();
1135 let payload = base64_engine.encode("Dynamic payload");
1136
1137 let json_str = format!(
1139 r#"{{
1140 "__otel_otlp_stdout": "{}",
1141 "source": "{}",
1142 "endpoint": "http://localhost:4318/v1/traces",
1143 "method": "POST",
1144 "content-type": "application/x-protobuf",
1145 "content-encoding": "gzip",
1146 "headers": {{
1147 "dynamic-key": "dynamic-value"
1148 }},
1149 "payload": "{}",
1150 "base64": true
1151 }}"#,
1152 version, service, payload
1153 );
1154
1155 let output: ExporterOutput = serde_json::from_str(&json_str).unwrap();
1157
1158 assert_eq!(output.version, version);
1160 assert_eq!(output.source, service);
1161 assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1162 assert_eq!(output.method, "POST");
1163 assert_eq!(output.content_type, "application/x-protobuf");
1164 assert_eq!(output.content_encoding, "gzip");
1165 assert_eq!(output.headers.as_ref().unwrap().len(), 1);
1166 assert_eq!(
1167 output.headers.as_ref().unwrap().get("dynamic-key").unwrap(),
1168 "dynamic-value"
1169 );
1170 assert_eq!(output.payload, payload);
1171 assert!(output.base64);
1172
1173 let decoded = base64_engine.decode(&output.payload).unwrap();
1175 let payload_text = String::from_utf8(decoded).unwrap();
1176 assert_eq!(payload_text, "Dynamic payload");
1177 }
1178
1179 #[test]
1180 fn test_log_level_from_str() {
1181 assert_eq!(LogLevel::from_str("debug").unwrap(), LogLevel::Debug);
1182 assert_eq!(LogLevel::from_str("DEBUG").unwrap(), LogLevel::Debug);
1183 assert_eq!(LogLevel::from_str("info").unwrap(), LogLevel::Info);
1184 assert_eq!(LogLevel::from_str("INFO").unwrap(), LogLevel::Info);
1185 assert_eq!(LogLevel::from_str("warn").unwrap(), LogLevel::Warn);
1186 assert_eq!(LogLevel::from_str("warning").unwrap(), LogLevel::Warn);
1187 assert_eq!(LogLevel::from_str("WARN").unwrap(), LogLevel::Warn);
1188 assert_eq!(LogLevel::from_str("error").unwrap(), LogLevel::Error);
1189 assert_eq!(LogLevel::from_str("ERROR").unwrap(), LogLevel::Error);
1190
1191 assert!(LogLevel::from_str("invalid").is_err());
1192 }
1193
1194 #[test]
1195 fn test_log_level_display() {
1196 assert_eq!(LogLevel::Debug.to_string(), "DEBUG");
1197 assert_eq!(LogLevel::Info.to_string(), "INFO");
1198 assert_eq!(LogLevel::Warn.to_string(), "WARN");
1199 assert_eq!(LogLevel::Error.to_string(), "ERROR");
1200 }
1201
1202 #[test]
1203 #[serial]
1204 fn test_log_level_from_env() {
1205 std::env::set_var(env_vars::LOG_LEVEL, "debug");
1207 let exporter = OtlpStdoutSpanExporter::default();
1208 assert_eq!(exporter.level, Some(LogLevel::Debug));
1209
1210 std::env::set_var(env_vars::LOG_LEVEL, "invalid");
1212 let exporter = OtlpStdoutSpanExporter::default();
1213 assert_eq!(exporter.level, None);
1214
1215 std::env::remove_var(env_vars::LOG_LEVEL);
1217 let exporter = OtlpStdoutSpanExporter::builder()
1218 .level(LogLevel::Error)
1219 .build();
1220 assert_eq!(exporter.level, Some(LogLevel::Error));
1221
1222 std::env::set_var(env_vars::LOG_LEVEL, "warn");
1224 let exporter = OtlpStdoutSpanExporter::builder()
1225 .level(LogLevel::Error)
1226 .build();
1227 assert_eq!(exporter.level, Some(LogLevel::Warn));
1228
1229 std::env::remove_var(env_vars::LOG_LEVEL);
1231 }
1232
1233 #[tokio::test]
1234 #[serial]
1235 async fn test_log_level_in_output() {
1236 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1238 exporter.level = Some(LogLevel::Debug);
1239 let span = create_test_span();
1240
1241 let result = exporter.export(vec![span]).await;
1242 assert!(result.is_ok());
1243
1244 let output_lines = output.get_output();
1245 assert_eq!(output_lines.len(), 1);
1246
1247 let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1249 assert_eq!(json["level"], "DEBUG");
1250
1251 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1253 exporter.level = None;
1254 let span = create_test_span();
1255
1256 let result = exporter.export(vec![span]).await;
1257 assert!(result.is_ok());
1258
1259 let output_lines = output.get_output();
1260 assert_eq!(output_lines.len(), 1);
1261
1262 let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1264 assert!(!json.as_object().unwrap().contains_key("level"));
1265 }
1266
1267 #[test]
1268 fn test_stdout_output() {
1269 let output = create_output(false);
1270 assert!(format!("{:?}", output).contains("StdOutput"));
1272 }
1273
1274 #[test]
1275 fn test_pipe_output() {
1276 let output = create_output(true);
1277 let debug_str = format!("{:?}", output);
1279 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1280 }
1281
1282 #[test]
1283 fn test_env_var_precedence() {
1284 let temp_dir = std::env::temp_dir();
1286 let path = temp_dir.join("test_pipe");
1287
1288 std::env::remove_var(env_vars::OUTPUT_TYPE);
1290
1291 std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1293
1294 let exporter = OtlpStdoutSpanExporter::default();
1296
1297 let debug_str = format!("{:?}", exporter.output);
1299 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1300
1301 std::env::remove_var(env_vars::OUTPUT_TYPE);
1303 if path.exists() {
1304 let _ = std::fs::remove_file(path);
1305 }
1306 }
1307
1308 #[test]
1309 fn test_constructor_precedence() {
1310 let temp_dir = std::env::temp_dir();
1312 let path = temp_dir.join("test_pipe");
1313
1314 std::env::remove_var(env_vars::OUTPUT_TYPE);
1316
1317 let exporter = OtlpStdoutSpanExporter::builder().pipe(true).build();
1319
1320 let debug_str = format!("{:?}", exporter.output);
1322 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1323
1324 if path.exists() {
1326 let _ = std::fs::remove_file(path);
1327 }
1328 }
1329
1330 #[test]
1331 fn test_env_var_overrides_constructor() {
1332 let temp_dir = std::env::temp_dir();
1334 let path = temp_dir.join("test_pipe");
1335
1336 std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1338
1339 let exporter = OtlpStdoutSpanExporter::builder().pipe(false).build();
1341
1342 let debug_str = format!("{:?}", exporter.output);
1344 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1345
1346 std::env::remove_var(env_vars::OUTPUT_TYPE);
1348 if path.exists() {
1349 let _ = std::fs::remove_file(path);
1350 }
1351 }
1352}