1use async_trait::async_trait;
118use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
119use bon::bon;
120use flate2::{write::GzEncoder, Compression};
121use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
122use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
123use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
124use opentelemetry_sdk::error::OTelSdkResult;
125use opentelemetry_sdk::resource::Resource;
126use opentelemetry_sdk::{
127 error::OTelSdkError,
128 trace::{SpanData, SpanExporter},
129};
130use prost::Message;
131use serde::{Deserialize, Serialize};
132use std::{
133 collections::HashMap,
134 env,
135 fmt::{self, Display},
136 fs::OpenOptions,
137 io::{self, Write},
138 path::PathBuf,
139 result::Result,
140 str::FromStr,
141 sync::{Arc, Mutex},
142};
143
144mod constants;
145use constants::{defaults, env_vars};
146
147pub mod consts {
149 pub use crate::constants::defaults;
155 pub use crate::constants::env_vars;
156 pub use crate::constants::resource_attributes;
157}
158
159const VERSION: &str = env!("CARGO_PKG_VERSION");
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
163pub enum LogLevel {
164 Debug,
166 #[default]
168 Info,
169 Warn,
171 Error,
173}
174
175impl FromStr for LogLevel {
176 type Err = String;
177
178 fn from_str(s: &str) -> Result<Self, Self::Err> {
179 match s.to_lowercase().as_str() {
180 "debug" => Ok(LogLevel::Debug),
181 "info" => Ok(LogLevel::Info),
182 "warn" | "warning" => Ok(LogLevel::Warn),
183 "error" => Ok(LogLevel::Error),
184 _ => Err(format!("Invalid log level: {}", s)),
185 }
186 }
187}
188
189impl Display for LogLevel {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 match self {
192 LogLevel::Debug => write!(f, "DEBUG"),
193 LogLevel::Info => write!(f, "INFO"),
194 LogLevel::Warn => write!(f, "WARN"),
195 LogLevel::Error => write!(f, "ERROR"),
196 }
197 }
198}
199
200pub trait Output: Send + Sync + std::fmt::Debug + std::any::Any {
205 fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
215
216 fn is_pipe(&self) -> bool;
221
222 fn touch_pipe(&self) -> Result<(), OTelSdkError> {
233 Ok(())
235 }
236}
237
238#[derive(Debug, Default)]
240struct StdOutput;
241
242impl Output for StdOutput {
243 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
244 let stdout = io::stdout();
246 let mut handle = stdout.lock();
247
248 writeln!(handle, "{}", line).map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
250
251 Ok(())
252 }
253
254 fn is_pipe(&self) -> bool {
255 false }
257}
258
259#[derive(Debug)]
261struct NamedPipeOutput {
262 path: PathBuf,
263}
264
265impl NamedPipeOutput {
266 fn new() -> Result<Self, OTelSdkError> {
267 let path_buf = PathBuf::from(defaults::PIPE_PATH);
268 if !path_buf.exists() {
269 log::warn!("Named pipe does not exist: {}", defaults::PIPE_PATH);
270 }
272
273 Ok(Self { path: path_buf })
274 }
275}
276
277impl Output for NamedPipeOutput {
278 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
279 let mut file = OpenOptions::new()
281 .write(true)
282 .open(&self.path)
283 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to open pipe: {}", e)))?;
284
285 writeln!(file, "{}", line).map_err(|e| {
287 OTelSdkError::InternalFailure(format!("Failed to write to pipe: {}", e))
288 })?;
289
290 Ok(())
291 }
292
293 fn is_pipe(&self) -> bool {
294 true }
296
297 fn touch_pipe(&self) -> Result<(), OTelSdkError> {
298 let _file = OpenOptions::new()
300 .write(true)
301 .open(&self.path)
302 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to touch pipe: {}", e)))?;
303 Ok(())
304 }
305}
306
307#[derive(Clone, Default)]
309pub struct BufferOutput {
310 buffer: Arc<Mutex<Vec<String>>>,
311}
312
313impl BufferOutput {
314 pub fn new() -> Self {
316 Self::default()
317 }
318
319 pub fn take_lines(&self) -> Result<Vec<String>, OTelSdkError> {
321 let mut guard = self.buffer.lock().map_err(|e| {
322 OTelSdkError::InternalFailure(format!(
323 "Failed to lock buffer mutex for take_lines: {}",
324 e
325 ))
326 })?;
327 Ok(std::mem::take(&mut *guard)) }
329}
330
331impl Output for BufferOutput {
332 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
333 let mut guard = self.buffer.lock().map_err(|e| {
334 OTelSdkError::InternalFailure(format!(
335 "Failed to lock buffer mutex for write_line: {}",
336 e
337 ))
338 })?;
339 guard.push(line.to_string());
340 Ok(())
341 }
342
343 fn is_pipe(&self) -> bool {
344 false }
346}
347
348impl fmt::Debug for BufferOutput {
350 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351 f.debug_struct("BufferOutput").finish_non_exhaustive()
353 }
354}
355
356fn create_output(use_pipe: bool) -> Arc<dyn Output> {
358 if use_pipe {
359 match NamedPipeOutput::new() {
360 Ok(output) => Arc::new(output),
361 Err(e) => {
362 log::warn!(
363 "Failed to create named pipe output: {}, falling back to stdout",
364 e
365 );
366 Arc::new(StdOutput)
367 }
368 }
369 } else {
370 Arc::new(StdOutput)
371 }
372}
373
374#[derive(Debug, Serialize, Deserialize)]
379pub struct ExporterOutput {
380 #[serde(rename = "__otel_otlp_stdout")]
382 pub version: String,
383 pub source: String,
385 pub endpoint: String,
387 pub method: String,
389 #[serde(rename = "content-type")]
391 pub content_type: String,
392 #[serde(rename = "content-encoding")]
394 pub content_encoding: String,
395 #[serde(skip_serializing_if = "ExporterOutput::is_headers_empty")]
397 pub headers: Option<HashMap<String, String>>,
398 pub payload: String,
400 pub base64: bool,
402 #[serde(skip_serializing_if = "Option::is_none")]
404 pub level: Option<String>,
405}
406
407impl ExporterOutput {
408 fn is_headers_empty(headers: &Option<HashMap<String, String>>) -> bool {
410 headers.as_ref().map_or(true, |h| h.is_empty())
411 }
412}
413
414#[derive(Debug)]
438pub struct OtlpStdoutSpanExporter {
439 compression_level: u8,
441 resource: Option<Resource>,
443 headers: Option<HashMap<String, String>>,
445 output: Arc<dyn Output>,
447 level: Option<LogLevel>,
449}
450
451impl Default for OtlpStdoutSpanExporter {
452 fn default() -> Self {
453 Self::builder().build()
454 }
455}
456#[bon]
457impl OtlpStdoutSpanExporter {
458 #[builder]
478 pub fn new(
479 compression_level: Option<u8>,
480 resource: Option<Resource>,
481 headers: Option<HashMap<String, String>>,
482 output: Option<Arc<dyn Output>>,
483 level: Option<LogLevel>,
484 pipe: Option<bool>,
485 ) -> Self {
486 let compression_level = match env::var(env_vars::COMPRESSION_LEVEL) {
488 Ok(value) => match value.parse::<u8>() {
489 Ok(level) if level <= 9 => level,
490 Ok(level) => {
491 log::warn!(
492 "Invalid value in {}: {} (must be 0-9), using fallback",
493 env_vars::COMPRESSION_LEVEL,
494 level
495 );
496 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
497 }
498 Err(_) => {
499 log::warn!(
500 "Failed to parse {}: {}, using fallback",
501 env_vars::COMPRESSION_LEVEL,
502 value
503 );
504 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
505 }
506 },
507 Err(_) => {
508 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
510 }
511 };
512
513 let headers = match headers {
515 Some(constructor_headers) => {
516 if let Some(env_headers) = Self::parse_headers() {
517 let mut merged = constructor_headers;
519 merged.extend(env_headers);
520 Some(merged)
521 } else {
522 Some(constructor_headers)
524 }
525 }
526 None => Self::parse_headers(), };
528
529 let level = match env::var(env_vars::LOG_LEVEL) {
531 Ok(value) => match LogLevel::from_str(&value) {
532 Ok(log_level) => Some(log_level),
533 Err(e) => {
534 log::warn!(
535 "Invalid log level in {}: {}, using fallback",
536 env_vars::LOG_LEVEL,
537 e
538 );
539 level
540 }
541 },
542 Err(_) => {
543 level
545 }
546 };
547
548 let use_pipe = match env::var(env_vars::OUTPUT_TYPE) {
550 Ok(value) => value.to_lowercase() == "pipe",
551 Err(_) => pipe.unwrap_or(false),
552 };
553
554 let output = output.unwrap_or_else(|| create_output(use_pipe));
556
557 Self {
558 compression_level,
559 resource,
560 headers,
561 output,
562 level,
563 }
564 }
565
566 fn get_service_name() -> String {
574 env::var(env_vars::SERVICE_NAME)
575 .or_else(|_| env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME))
576 .unwrap_or_else(|_| defaults::SERVICE_NAME.to_string())
577 }
578
579 #[cfg(test)]
580 fn with_test_output() -> (Self, Arc<TestOutput>) {
581 let output = Arc::new(TestOutput::new());
582
583 let exporter = Self::builder().output(output.clone()).build();
585
586 (exporter, output)
587 }
588
589 fn parse_headers() -> Option<HashMap<String, String>> {
594 let get_headers = |var_name: &str| -> Option<HashMap<String, String>> {
596 env::var(var_name).ok().map(|header_str| {
597 let mut map = HashMap::new();
598 Self::parse_header_string(&header_str, &mut map);
599 map
600 })
601 };
602
603 let global_headers = get_headers("OTEL_EXPORTER_OTLP_HEADERS");
605 let trace_headers = get_headers("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
606
607 if global_headers.is_none() && trace_headers.is_none() {
609 return None;
610 }
611
612 let mut result = HashMap::new();
614
615 if let Some(headers) = global_headers {
617 result.extend(headers);
618 }
619
620 if let Some(headers) = trace_headers {
622 result.extend(headers);
623 }
624
625 if result.is_empty() {
627 None
628 } else {
629 Some(result)
630 }
631 }
632
633 fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
640 for pair in header_str.split(',') {
641 if let Some((key, value)) = pair.split_once('=') {
642 let key = key.trim().to_lowercase();
643 if key != "content-type" && key != "content-encoding" {
645 headers.insert(key, value.trim().to_string());
646 }
647 }
648 }
649 }
650}
651
652#[async_trait]
653impl SpanExporter for OtlpStdoutSpanExporter {
654 fn export(
671 &self,
672 batch: Vec<SpanData>,
673 ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
674 if batch.is_empty() && self.output.is_pipe() {
676 let touch_result = self.output.touch_pipe();
678 return Box::pin(std::future::ready(touch_result));
679 }
680
681 let result = (|| {
683 let resource = self
685 .resource
686 .clone()
687 .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
688 let resource_attrs = ResourceAttributesWithSchema::from(&resource);
689 let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
690 let request = ExportTraceServiceRequest { resource_spans };
691
692 let proto_bytes = request.encode_to_vec();
694
695 let mut encoder =
697 GzEncoder::new(Vec::new(), Compression::new(self.compression_level as u32));
698 encoder
699 .write_all(&proto_bytes)
700 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
701 let compressed_bytes = encoder
702 .finish()
703 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
704
705 let payload = base64_engine.encode(compressed_bytes);
707
708 let output_data = ExporterOutput {
710 version: VERSION.to_string(),
711 source: Self::get_service_name(),
712 endpoint: defaults::ENDPOINT.to_string(),
713 method: "POST".to_string(),
714 content_type: "application/x-protobuf".to_string(),
715 content_encoding: "gzip".to_string(),
716 headers: self.headers.clone(),
717 payload,
718 base64: true,
719 level: self.level.map(|l| l.to_string()),
720 };
721
722 self.output.write_line(
724 &serde_json::to_string(&output_data)
725 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
726 )?;
727
728 Ok(())
729 })();
730
731 Box::pin(std::future::ready(result))
733 }
734
735 fn shutdown(&mut self) -> Result<(), OTelSdkError> {
743 Ok(())
744 }
745
746 fn force_flush(&mut self) -> Result<(), OTelSdkError> {
754 Ok(())
755 }
756
757 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
767 self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
768 resource.clone(),
769 ));
770 }
771}
772
773#[cfg(doctest)]
774#[macro_use]
775extern crate doc_comment;
776
777#[cfg(doctest)]
778use doc_comment::doctest;
779
780#[cfg(doctest)]
781doctest!("../README.md", readme);
782
783#[cfg(test)]
785#[derive(Debug, Default)]
786struct TestOutput {
787 buffer: Arc<Mutex<Vec<String>>>,
788}
789
790#[cfg(test)]
791impl TestOutput {
792 fn new() -> Self {
793 Self {
794 buffer: Arc::new(Mutex::new(Vec::new())),
795 }
796 }
797
798 fn get_output(&self) -> Vec<String> {
799 self.buffer.lock().unwrap().clone()
800 }
801}
802
803#[cfg(test)]
804impl Output for TestOutput {
805 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
806 self.buffer.lock().unwrap().push(line.to_string());
807 Ok(())
808 }
809
810 fn is_pipe(&self) -> bool {
811 false }
813
814 }
816
817#[cfg(test)]
818mod tests {
819 use super::*;
820 use opentelemetry::{
821 trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
822 InstrumentationScope, KeyValue,
823 };
824 use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
825 use serde_json::Value;
826 use serial_test::serial;
827 use std::time::SystemTime;
828
829 fn create_test_span() -> SpanData {
830 let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
831 let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
832 let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
833
834 let span_context = SpanContext::new(
835 TraceId::from_bytes(trace_id_bytes),
836 SpanId::from_bytes(span_id_bytes),
837 TraceFlags::default(),
838 false,
839 TraceState::default(),
840 );
841
842 SpanData {
843 span_context,
844 parent_span_id: SpanId::from_bytes(parent_id_bytes),
845 span_kind: SpanKind::Client,
846 name: "test-span".into(),
847 start_time: SystemTime::UNIX_EPOCH,
848 end_time: SystemTime::UNIX_EPOCH,
849 attributes: vec![KeyValue::new("test.key", "test-value")],
850 dropped_attributes_count: 0,
851 events: SpanEvents::default(),
852 links: SpanLinks::default(),
853 status: Status::Ok,
854 instrumentation_scope: InstrumentationScope::builder("test-library")
855 .with_version("1.0.0")
856 .with_schema_url("https://opentelemetry.io/schema/1.0.0")
857 .build(),
858 }
859 }
860
861 #[test]
862 fn test_parse_headers() {
863 std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
864 std::env::set_var(
865 "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
866 "key2=override,key3=value3",
867 );
868
869 let headers = OtlpStdoutSpanExporter::parse_headers();
870
871 assert!(headers.is_some());
873 let headers = headers.unwrap();
874
875 assert_eq!(headers.get("key1").unwrap(), "value1");
876 assert_eq!(headers.get("key2").unwrap(), "override");
877 assert_eq!(headers.get("key3").unwrap(), "value3");
878
879 std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
881 std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
882 }
883
884 #[test]
885 fn test_service_name_resolution() {
886 std::env::set_var(env_vars::SERVICE_NAME, "otel-service");
888 std::env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "lambda-function");
889 assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
890
891 std::env::remove_var(env_vars::SERVICE_NAME);
893 assert_eq!(
894 OtlpStdoutSpanExporter::get_service_name(),
895 "lambda-function"
896 );
897
898 std::env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
900 assert_eq!(
901 OtlpStdoutSpanExporter::get_service_name(),
902 defaults::SERVICE_NAME
903 );
904 }
905
906 #[test]
907 fn test_compression_level_precedence() {
908 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
910 let exporter = OtlpStdoutSpanExporter::builder()
911 .compression_level(7)
912 .build();
913 assert_eq!(exporter.compression_level, 3);
914
915 std::env::set_var(env_vars::COMPRESSION_LEVEL, "invalid");
917 let exporter = OtlpStdoutSpanExporter::builder()
918 .compression_level(7)
919 .build();
920 assert_eq!(exporter.compression_level, 7);
921
922 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
924 let exporter = OtlpStdoutSpanExporter::builder()
925 .compression_level(7)
926 .build();
927 assert_eq!(exporter.compression_level, 7);
928
929 let exporter = OtlpStdoutSpanExporter::builder()
931 .compression_level(defaults::COMPRESSION_LEVEL)
932 .build();
933 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
934 }
935
936 #[test]
937 fn test_new_uses_env_compression_level() {
938 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
940 let exporter = OtlpStdoutSpanExporter::default();
941 assert_eq!(exporter.compression_level, 3);
942
943 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
945 let exporter = OtlpStdoutSpanExporter::default();
946 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
947 }
948
949 #[tokio::test]
950 #[serial]
951 async fn test_compression_level_affects_output_size() {
952 let mut spans = Vec::new();
954 for i in 0..100 {
955 let mut span = create_test_span();
956 span.attributes.push(KeyValue::new("index", i));
958 span.attributes
960 .push(KeyValue::new("data", "a".repeat(1000)));
961 spans.push(span);
962 }
963
964 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
966
967 let no_compression_output = Arc::new(TestOutput::new());
969 let no_compression_exporter = OtlpStdoutSpanExporter {
970 compression_level: 0,
971 resource: None,
972 output: no_compression_output.clone() as Arc<dyn Output>,
973 headers: None,
974 level: None,
975 };
976 let _ = no_compression_exporter.export(spans.clone()).await;
977 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
978
979 let max_compression_output = Arc::new(TestOutput::new());
981 let max_compression_exporter = OtlpStdoutSpanExporter {
982 compression_level: 9,
983 resource: None,
984 output: max_compression_output.clone() as Arc<dyn Output>,
985 headers: None,
986 level: None,
987 };
988 let _ = max_compression_exporter.export(spans.clone()).await;
989 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
990
991 assert!(no_compression_size > max_compression_size,
993 "Maximum compression (level 9) should produce output no larger than no compression (level 0). Got {} vs {}",
994 max_compression_size, no_compression_size);
995
996 let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
998 let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
999
1000 assert_eq!(
1001 no_compression_spans,
1002 spans.len(),
1003 "No compression output should contain all spans"
1004 );
1005 assert_eq!(
1006 max_compression_spans,
1007 spans.len(),
1008 "Maximum compression output should contain all spans"
1009 );
1010 }
1011
1012 fn extract_payload_size(json_str: &str) -> usize {
1014 let json: Value = serde_json::from_str(json_str).unwrap();
1015 let payload = json["payload"].as_str().unwrap();
1016 base64_engine.decode(payload).unwrap().len()
1017 }
1018
1019 fn decode_and_count_spans(json_str: &str) -> usize {
1021 let json: Value = serde_json::from_str(json_str).unwrap();
1022 let payload = json["payload"].as_str().unwrap();
1023 let decoded = base64_engine.decode(payload).unwrap();
1024
1025 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
1026 let mut decompressed = Vec::new();
1027 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
1028
1029 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
1030
1031 let mut span_count = 0;
1033 for resource_span in &request.resource_spans {
1034 for scope_span in &resource_span.scope_spans {
1035 span_count += scope_span.spans.len();
1036 }
1037 }
1038
1039 span_count
1040 }
1041
1042 #[tokio::test]
1043 async fn test_export_single_span() {
1044 let (exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1045 let span = create_test_span();
1046
1047 let result = exporter.export(vec![span]).await;
1048 assert!(result.is_ok());
1049
1050 let output = output.get_output();
1051 assert_eq!(output.len(), 1);
1052
1053 let json: Value = serde_json::from_str(&output[0]).unwrap();
1055 assert_eq!(json["__otel_otlp_stdout"], VERSION);
1056 assert_eq!(json["method"], "POST");
1057 assert_eq!(json["content-type"], "application/x-protobuf");
1058 assert_eq!(json["content-encoding"], "gzip");
1059 assert_eq!(json["base64"], true);
1060
1061 let payload = json["payload"].as_str().unwrap();
1063 let decoded = base64_engine.decode(payload).unwrap();
1064
1065 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
1067 let mut decompressed = Vec::new();
1068 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
1069
1070 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
1072 assert_eq!(request.resource_spans.len(), 1);
1073 }
1074
1075 #[tokio::test]
1076 async fn test_export_empty_batch() {
1077 let exporter = OtlpStdoutSpanExporter::default();
1078 let result = exporter.export(vec![]).await;
1079 assert!(result.is_ok());
1080 }
1081
1082 #[test]
1083 #[serial]
1084 fn test_gzip_level_configuration() {
1085 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1087
1088 let exporter = OtlpStdoutSpanExporter::builder()
1090 .compression_level(9)
1091 .build();
1092 assert_eq!(exporter.compression_level, 9);
1093 }
1094
1095 #[tokio::test]
1096 #[serial]
1097 async fn test_env_var_affects_export_compression() {
1098 let span = create_test_span();
1100 let mut spans = Vec::new();
1101 for i in 0..100 {
1103 let mut span = span.clone();
1104 span.attributes
1106 .push(KeyValue::new(format!("test-key-{}", i), "a".repeat(1000)));
1107 spans.push(span);
1108 }
1109
1110 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1112 let no_compression_output = Arc::new(TestOutput::new());
1113 let mut no_compression_exporter = OtlpStdoutSpanExporter::builder()
1114 .compression_level(0)
1115 .build();
1116 no_compression_exporter.output = no_compression_output.clone() as Arc<dyn Output>;
1117 let _ = no_compression_exporter.export(spans.clone()).await;
1118 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
1119
1120 std::env::set_var(env_vars::COMPRESSION_LEVEL, "9");
1122 let max_compression_output = Arc::new(TestOutput::new());
1123 let mut max_compression_exporter = OtlpStdoutSpanExporter::builder()
1124 .compression_level(9)
1125 .build();
1126 max_compression_exporter.output = max_compression_output.clone() as Arc<dyn Output>;
1127 let _ = max_compression_exporter.export(spans.clone()).await;
1128 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
1129
1130 assert!(no_compression_size > max_compression_size,
1132 "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
1133 max_compression_size, no_compression_size);
1134
1135 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1137 let explicit_output = Arc::new(TestOutput::new());
1138
1139 let explicit_exporter = OtlpStdoutSpanExporter::builder()
1141 .output(explicit_output.clone())
1142 .build();
1143
1144 let _ = explicit_exporter.export(spans.clone()).await;
1146 let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
1147
1148 assert!(explicit_size > max_compression_size,
1151 "Environment variable should take precedence over explicitly set level. Expected size closer to {} but got {}",
1152 no_compression_size, explicit_size);
1153
1154 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1156 }
1157
1158 #[tokio::test]
1159 #[serial]
1160 async fn test_environment_variable_precedence() {
1161 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
1163
1164 let exporter = OtlpStdoutSpanExporter::builder()
1167 .compression_level(9)
1168 .build();
1169 assert_eq!(exporter.compression_level, 3);
1170
1171 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1173 let exporter = OtlpStdoutSpanExporter::builder()
1174 .compression_level(9)
1175 .build();
1176 assert_eq!(exporter.compression_level, 9);
1177 }
1178
1179 #[test]
1180 fn test_exporter_output_deserialization() {
1181 let json_str = r#"{
1183 "__otel_otlp_stdout": "0.11.1",
1184 "source": "test-service",
1185 "endpoint": "http://localhost:4318/v1/traces",
1186 "method": "POST",
1187 "content-type": "application/x-protobuf",
1188 "content-encoding": "gzip",
1189 "headers": {
1190 "api-key": "test-key",
1191 "custom-header": "test-value"
1192 },
1193 "payload": "SGVsbG8gd29ybGQ=",
1194 "base64": true
1195 }"#;
1196
1197 let output: ExporterOutput = serde_json::from_str(json_str).unwrap();
1199
1200 assert_eq!(output.version, "0.11.1");
1202 assert_eq!(output.source, "test-service");
1203 assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1204 assert_eq!(output.method, "POST");
1205 assert_eq!(output.content_type, "application/x-protobuf");
1206 assert_eq!(output.content_encoding, "gzip");
1207 assert_eq!(output.headers.as_ref().unwrap().len(), 2);
1208 assert_eq!(
1209 output.headers.as_ref().unwrap().get("api-key").unwrap(),
1210 "test-key"
1211 );
1212 assert_eq!(
1213 output
1214 .headers
1215 .as_ref()
1216 .unwrap()
1217 .get("custom-header")
1218 .unwrap(),
1219 "test-value"
1220 );
1221 assert_eq!(output.payload, "SGVsbG8gd29ybGQ=");
1222 assert!(output.base64);
1223
1224 let decoded = base64_engine.decode(&output.payload).unwrap();
1226 let payload_text = String::from_utf8(decoded).unwrap();
1227 assert_eq!(payload_text, "Hello world");
1228 }
1229
1230 #[test]
1231 fn test_exporter_output_deserialization_dynamic() {
1232 let version = "0.11.1".to_string();
1234 let service = "dynamic-service".to_string();
1235 let payload = base64_engine.encode("Dynamic payload");
1236
1237 let json_str = format!(
1239 r#"{{
1240 "__otel_otlp_stdout": "{}",
1241 "source": "{}",
1242 "endpoint": "http://localhost:4318/v1/traces",
1243 "method": "POST",
1244 "content-type": "application/x-protobuf",
1245 "content-encoding": "gzip",
1246 "headers": {{
1247 "dynamic-key": "dynamic-value"
1248 }},
1249 "payload": "{}",
1250 "base64": true
1251 }}"#,
1252 version, service, payload
1253 );
1254
1255 let output: ExporterOutput = serde_json::from_str(&json_str).unwrap();
1257
1258 assert_eq!(output.version, version);
1260 assert_eq!(output.source, service);
1261 assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1262 assert_eq!(output.method, "POST");
1263 assert_eq!(output.content_type, "application/x-protobuf");
1264 assert_eq!(output.content_encoding, "gzip");
1265 assert_eq!(output.headers.as_ref().unwrap().len(), 1);
1266 assert_eq!(
1267 output.headers.as_ref().unwrap().get("dynamic-key").unwrap(),
1268 "dynamic-value"
1269 );
1270 assert_eq!(output.payload, payload);
1271 assert!(output.base64);
1272
1273 let decoded = base64_engine.decode(&output.payload).unwrap();
1275 let payload_text = String::from_utf8(decoded).unwrap();
1276 assert_eq!(payload_text, "Dynamic payload");
1277 }
1278
1279 #[test]
1280 fn test_log_level_from_str() {
1281 assert_eq!(LogLevel::from_str("debug").unwrap(), LogLevel::Debug);
1282 assert_eq!(LogLevel::from_str("DEBUG").unwrap(), LogLevel::Debug);
1283 assert_eq!(LogLevel::from_str("info").unwrap(), LogLevel::Info);
1284 assert_eq!(LogLevel::from_str("INFO").unwrap(), LogLevel::Info);
1285 assert_eq!(LogLevel::from_str("warn").unwrap(), LogLevel::Warn);
1286 assert_eq!(LogLevel::from_str("warning").unwrap(), LogLevel::Warn);
1287 assert_eq!(LogLevel::from_str("WARN").unwrap(), LogLevel::Warn);
1288 assert_eq!(LogLevel::from_str("error").unwrap(), LogLevel::Error);
1289 assert_eq!(LogLevel::from_str("ERROR").unwrap(), LogLevel::Error);
1290
1291 assert!(LogLevel::from_str("invalid").is_err());
1292 }
1293
1294 #[test]
1295 fn test_log_level_display() {
1296 assert_eq!(LogLevel::Debug.to_string(), "DEBUG");
1297 assert_eq!(LogLevel::Info.to_string(), "INFO");
1298 assert_eq!(LogLevel::Warn.to_string(), "WARN");
1299 assert_eq!(LogLevel::Error.to_string(), "ERROR");
1300 }
1301
1302 #[test]
1303 #[serial]
1304 fn test_log_level_from_env() {
1305 std::env::set_var(env_vars::LOG_LEVEL, "debug");
1307 let exporter = OtlpStdoutSpanExporter::default();
1308 assert_eq!(exporter.level, Some(LogLevel::Debug));
1309
1310 std::env::set_var(env_vars::LOG_LEVEL, "invalid");
1312 let exporter = OtlpStdoutSpanExporter::default();
1313 assert_eq!(exporter.level, None);
1314
1315 std::env::remove_var(env_vars::LOG_LEVEL);
1317 let exporter = OtlpStdoutSpanExporter::builder()
1318 .level(LogLevel::Error)
1319 .build();
1320 assert_eq!(exporter.level, Some(LogLevel::Error));
1321
1322 std::env::set_var(env_vars::LOG_LEVEL, "warn");
1324 let exporter = OtlpStdoutSpanExporter::builder()
1325 .level(LogLevel::Error)
1326 .build();
1327 assert_eq!(exporter.level, Some(LogLevel::Warn));
1328
1329 std::env::remove_var(env_vars::LOG_LEVEL);
1331 }
1332
1333 #[tokio::test]
1334 #[serial]
1335 async fn test_log_level_in_output() {
1336 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1338 exporter.level = Some(LogLevel::Debug);
1339 let span = create_test_span();
1340
1341 let result = exporter.export(vec![span]).await;
1342 assert!(result.is_ok());
1343
1344 let output_lines = output.get_output();
1345 assert_eq!(output_lines.len(), 1);
1346
1347 let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1349 assert_eq!(json["level"], "DEBUG");
1350
1351 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1353 exporter.level = None;
1354 let span = create_test_span();
1355
1356 let result = exporter.export(vec![span]).await;
1357 assert!(result.is_ok());
1358
1359 let output_lines = output.get_output();
1360 assert_eq!(output_lines.len(), 1);
1361
1362 let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1364 assert!(!json.as_object().unwrap().contains_key("level"));
1365 }
1366
1367 #[test]
1368 fn test_stdout_output() {
1369 let output = create_output(false);
1370 assert!(format!("{:?}", output).contains("StdOutput"));
1372 }
1373
1374 #[test]
1375 fn test_pipe_output() {
1376 let output = create_output(true);
1377 let debug_str = format!("{:?}", output);
1379 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1380 }
1381
1382 #[test]
1383 fn test_env_var_precedence() {
1384 let temp_dir = std::env::temp_dir();
1386 let path = temp_dir.join("test_pipe");
1387
1388 std::env::remove_var(env_vars::OUTPUT_TYPE);
1390
1391 std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1393
1394 let exporter = OtlpStdoutSpanExporter::default();
1396
1397 let debug_str = format!("{:?}", exporter.output);
1399 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1400
1401 std::env::remove_var(env_vars::OUTPUT_TYPE);
1403 if path.exists() {
1404 let _ = std::fs::remove_file(path);
1405 }
1406 }
1407
1408 #[test]
1409 fn test_constructor_precedence() {
1410 let temp_dir = std::env::temp_dir();
1412 let path = temp_dir.join("test_pipe");
1413
1414 std::env::remove_var(env_vars::OUTPUT_TYPE);
1416
1417 let exporter = OtlpStdoutSpanExporter::builder().pipe(true).build();
1419
1420 let debug_str = format!("{:?}", exporter.output);
1422 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1423
1424 if path.exists() {
1426 let _ = std::fs::remove_file(path);
1427 }
1428 }
1429
1430 #[test]
1431 fn test_env_var_overrides_constructor() {
1432 let temp_dir = std::env::temp_dir();
1434 let path = temp_dir.join("test_pipe");
1435
1436 std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1438
1439 let exporter = OtlpStdoutSpanExporter::builder().pipe(false).build();
1441
1442 let debug_str = format!("{:?}", exporter.output);
1444 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1445
1446 std::env::remove_var(env_vars::OUTPUT_TYPE);
1448 if path.exists() {
1449 let _ = std::fs::remove_file(path);
1450 }
1451 }
1452}