1use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
118use bon::bon;
119use flate2::{write::GzEncoder, Compression};
120use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
121use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
122use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
123use opentelemetry_sdk::error::OTelSdkResult;
124use opentelemetry_sdk::resource::Resource;
125use opentelemetry_sdk::{
126 error::OTelSdkError,
127 trace::{SpanData, SpanExporter},
128};
129use prost::Message;
130use serde::{Deserialize, Serialize};
131use std::{
132 collections::HashMap,
133 env,
134 fmt::{self, Display},
135 fs::OpenOptions,
136 io::{self, Write},
137 path::PathBuf,
138 result::Result,
139 str::FromStr,
140 sync::{Arc, Mutex},
141};
142
143mod constants;
144use constants::{defaults, env_vars};
145
146pub mod consts {
148 pub use crate::constants::defaults;
154 pub use crate::constants::env_vars;
155 pub use crate::constants::resource_attributes;
156}
157
158const VERSION: &str = env!("CARGO_PKG_VERSION");
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
162pub enum LogLevel {
163 Debug,
165 #[default]
167 Info,
168 Warn,
170 Error,
172}
173
174impl FromStr for LogLevel {
175 type Err = String;
176
177 fn from_str(s: &str) -> Result<Self, Self::Err> {
178 match s.to_lowercase().as_str() {
179 "debug" => Ok(LogLevel::Debug),
180 "info" => Ok(LogLevel::Info),
181 "warn" | "warning" => Ok(LogLevel::Warn),
182 "error" => Ok(LogLevel::Error),
183 _ => Err(format!("Invalid log level: {s}")),
184 }
185 }
186}
187
188impl Display for LogLevel {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 match self {
191 LogLevel::Debug => write!(f, "DEBUG"),
192 LogLevel::Info => write!(f, "INFO"),
193 LogLevel::Warn => write!(f, "WARN"),
194 LogLevel::Error => write!(f, "ERROR"),
195 }
196 }
197}
198
199pub trait Output: Send + Sync + std::fmt::Debug + std::any::Any {
204 fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
214
215 fn is_pipe(&self) -> bool;
220
221 fn touch_pipe(&self) -> Result<(), OTelSdkError> {
232 Ok(())
234 }
235}
236
237#[derive(Debug, Default)]
239struct StdOutput;
240
241impl Output for StdOutput {
242 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
243 let stdout = io::stdout();
245 let mut handle = stdout.lock();
246
247 writeln!(handle, "{line}").map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
249
250 Ok(())
251 }
252
253 fn is_pipe(&self) -> bool {
254 false }
256}
257
258#[derive(Debug)]
260struct NamedPipeOutput {
261 path: PathBuf,
262}
263
264impl NamedPipeOutput {
265 fn new() -> Result<Self, OTelSdkError> {
266 let path_buf = PathBuf::from(defaults::PIPE_PATH);
267 if !path_buf.exists() {
268 log::warn!("Named pipe does not exist: {}", defaults::PIPE_PATH);
269 }
271
272 Ok(Self { path: path_buf })
273 }
274}
275
276impl Output for NamedPipeOutput {
277 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
278 let mut file = OpenOptions::new()
280 .write(true)
281 .open(&self.path)
282 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to open pipe: {e}")))?;
283
284 writeln!(file, "{line}")
286 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to write to pipe: {e}")))?;
287
288 Ok(())
289 }
290
291 fn is_pipe(&self) -> bool {
292 true }
294
295 fn touch_pipe(&self) -> Result<(), OTelSdkError> {
296 let _file = OpenOptions::new()
298 .write(true)
299 .open(&self.path)
300 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to touch pipe: {e}")))?;
301 Ok(())
302 }
303}
304
305#[derive(Clone, Default)]
307pub struct BufferOutput {
308 buffer: Arc<Mutex<Vec<String>>>,
309}
310
311impl BufferOutput {
312 pub fn new() -> Self {
314 Self::default()
315 }
316
317 pub fn take_lines(&self) -> Result<Vec<String>, OTelSdkError> {
319 let mut guard = self.buffer.lock().map_err(|e| {
320 OTelSdkError::InternalFailure(format!(
321 "Failed to lock buffer mutex for take_lines: {e}"
322 ))
323 })?;
324 Ok(std::mem::take(&mut *guard)) }
326}
327
328impl Output for BufferOutput {
329 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
330 let mut guard = self.buffer.lock().map_err(|e| {
331 OTelSdkError::InternalFailure(format!(
332 "Failed to lock buffer mutex for write_line: {e}"
333 ))
334 })?;
335 guard.push(line.to_string());
336 Ok(())
337 }
338
339 fn is_pipe(&self) -> bool {
340 false }
342}
343
344impl fmt::Debug for BufferOutput {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 f.debug_struct("BufferOutput").finish_non_exhaustive()
349 }
350}
351
352fn create_output(use_pipe: bool) -> Arc<dyn Output> {
354 if use_pipe {
355 match NamedPipeOutput::new() {
356 Ok(output) => Arc::new(output),
357 Err(e) => {
358 log::warn!("Failed to create named pipe output: {e}, falling back to stdout");
359 Arc::new(StdOutput)
360 }
361 }
362 } else {
363 Arc::new(StdOutput)
364 }
365}
366
367#[derive(Debug, Serialize, Deserialize)]
372pub struct ExporterOutput {
373 #[serde(rename = "__otel_otlp_stdout")]
375 pub version: String,
376 pub source: String,
378 pub endpoint: String,
380 pub method: String,
382 #[serde(rename = "content-type")]
384 pub content_type: String,
385 #[serde(rename = "content-encoding")]
387 pub content_encoding: String,
388 #[serde(skip_serializing_if = "ExporterOutput::is_headers_empty")]
390 pub headers: Option<HashMap<String, String>>,
391 pub payload: String,
393 pub base64: bool,
395 #[serde(skip_serializing_if = "Option::is_none")]
397 pub level: Option<String>,
398}
399
400impl ExporterOutput {
401 fn is_headers_empty(headers: &Option<HashMap<String, String>>) -> bool {
403 headers.as_ref().map_or(true, |h| h.is_empty())
404 }
405}
406
407#[derive(Debug)]
431pub struct OtlpStdoutSpanExporter {
432 compression_level: u8,
434 resource: Option<Resource>,
436 headers: Option<HashMap<String, String>>,
438 output: Arc<dyn Output>,
440 level: Option<LogLevel>,
442}
443
444impl Default for OtlpStdoutSpanExporter {
445 fn default() -> Self {
446 Self::builder().build()
447 }
448}
449#[bon]
450impl OtlpStdoutSpanExporter {
451 #[builder]
471 pub fn new(
472 compression_level: Option<u8>,
473 resource: Option<Resource>,
474 headers: Option<HashMap<String, String>>,
475 output: Option<Arc<dyn Output>>,
476 level: Option<LogLevel>,
477 pipe: Option<bool>,
478 ) -> Self {
479 let compression_level = match env::var(env_vars::COMPRESSION_LEVEL) {
481 Ok(value) => match value.parse::<u8>() {
482 Ok(level) if level <= 9 => level,
483 Ok(level) => {
484 log::warn!(
485 "Invalid value in {}: {} (must be 0-9), using fallback",
486 env_vars::COMPRESSION_LEVEL,
487 level
488 );
489 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
490 }
491 Err(_) => {
492 log::warn!(
493 "Failed to parse {}: {}, using fallback",
494 env_vars::COMPRESSION_LEVEL,
495 value
496 );
497 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
498 }
499 },
500 Err(_) => {
501 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
503 }
504 };
505
506 let headers = match headers {
508 Some(constructor_headers) => {
509 if let Some(env_headers) = Self::parse_headers() {
510 let mut merged = constructor_headers;
512 merged.extend(env_headers);
513 Some(merged)
514 } else {
515 Some(constructor_headers)
517 }
518 }
519 None => Self::parse_headers(), };
521
522 let level = match env::var(env_vars::LOG_LEVEL) {
524 Ok(value) => match LogLevel::from_str(&value) {
525 Ok(log_level) => Some(log_level),
526 Err(e) => {
527 log::warn!(
528 "Invalid log level in {}: {}, using fallback",
529 env_vars::LOG_LEVEL,
530 e
531 );
532 level
533 }
534 },
535 Err(_) => {
536 level
538 }
539 };
540
541 let use_pipe = match env::var(env_vars::OUTPUT_TYPE) {
543 Ok(value) => value.to_lowercase() == "pipe",
544 Err(_) => pipe.unwrap_or(false),
545 };
546
547 let output = output.unwrap_or_else(|| create_output(use_pipe));
549
550 Self {
551 compression_level,
552 resource,
553 headers,
554 output,
555 level,
556 }
557 }
558
559 fn get_service_name() -> String {
567 env::var(env_vars::SERVICE_NAME)
568 .or_else(|_| env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME))
569 .unwrap_or_else(|_| defaults::SERVICE_NAME.to_string())
570 }
571
572 #[cfg(test)]
573 fn with_test_output() -> (Self, Arc<TestOutput>) {
574 let output = Arc::new(TestOutput::new());
575
576 let exporter = Self::builder().output(output.clone()).build();
578
579 (exporter, output)
580 }
581
582 fn parse_headers() -> Option<HashMap<String, String>> {
587 let get_headers = |var_name: &str| -> Option<HashMap<String, String>> {
589 env::var(var_name).ok().map(|header_str| {
590 let mut map = HashMap::new();
591 Self::parse_header_string(&header_str, &mut map);
592 map
593 })
594 };
595
596 let global_headers = get_headers("OTEL_EXPORTER_OTLP_HEADERS");
598 let trace_headers = get_headers("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
599
600 if global_headers.is_none() && trace_headers.is_none() {
602 return None;
603 }
604
605 let mut result = HashMap::new();
607
608 if let Some(headers) = global_headers {
610 result.extend(headers);
611 }
612
613 if let Some(headers) = trace_headers {
615 result.extend(headers);
616 }
617
618 if result.is_empty() {
620 None
621 } else {
622 Some(result)
623 }
624 }
625
626 fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
633 for pair in header_str.split(',') {
634 if let Some((key, value)) = pair.split_once('=') {
635 let key = key.trim().to_lowercase();
636 if key != "content-type" && key != "content-encoding" {
638 headers.insert(key, value.trim().to_string());
639 }
640 }
641 }
642 }
643}
644
645impl SpanExporter for OtlpStdoutSpanExporter {
646 fn export(
663 &self,
664 batch: Vec<SpanData>,
665 ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
666 if batch.is_empty() && self.output.is_pipe() {
668 let touch_result = self.output.touch_pipe();
670 return Box::pin(std::future::ready(touch_result));
671 }
672
673 let result = (|| {
675 let resource = self
677 .resource
678 .clone()
679 .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
680 let resource_attrs = ResourceAttributesWithSchema::from(&resource);
681 let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
682 let request = ExportTraceServiceRequest { resource_spans };
683
684 let proto_bytes = request.encode_to_vec();
686
687 let mut encoder =
689 GzEncoder::new(Vec::new(), Compression::new(self.compression_level as u32));
690 encoder
691 .write_all(&proto_bytes)
692 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
693 let compressed_bytes = encoder
694 .finish()
695 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
696
697 let payload = base64_engine.encode(compressed_bytes);
699
700 let output_data = ExporterOutput {
702 version: VERSION.to_string(),
703 source: Self::get_service_name(),
704 endpoint: defaults::ENDPOINT.to_string(),
705 method: "POST".to_string(),
706 content_type: "application/x-protobuf".to_string(),
707 content_encoding: "gzip".to_string(),
708 headers: self.headers.clone(),
709 payload,
710 base64: true,
711 level: self.level.map(|l| l.to_string()),
712 };
713
714 self.output.write_line(
716 &serde_json::to_string(&output_data)
717 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
718 )?;
719
720 Ok(())
721 })();
722
723 Box::pin(std::future::ready(result))
725 }
726
727 fn shutdown_with_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), OTelSdkError> {
736 Ok(())
737 }
738
739 fn force_flush(&mut self) -> Result<(), OTelSdkError> {
747 Ok(())
748 }
749
750 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
760 self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
761 resource.clone(),
762 ));
763 }
764}
765
766#[cfg(doctest)]
767#[macro_use]
768extern crate doc_comment;
769
770#[cfg(doctest)]
771use doc_comment::doctest;
772
773#[cfg(doctest)]
774doctest!("../README.md", readme);
775
776#[cfg(test)]
778#[derive(Debug, Default)]
779struct TestOutput {
780 buffer: Arc<Mutex<Vec<String>>>,
781}
782
783#[cfg(test)]
784impl TestOutput {
785 fn new() -> Self {
786 Self {
787 buffer: Arc::new(Mutex::new(Vec::new())),
788 }
789 }
790
791 fn get_output(&self) -> Vec<String> {
792 self.buffer.lock().unwrap().clone()
793 }
794}
795
796#[cfg(test)]
797impl Output for TestOutput {
798 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
799 self.buffer.lock().unwrap().push(line.to_string());
800 Ok(())
801 }
802
803 fn is_pipe(&self) -> bool {
804 false }
806
807 }
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813 use opentelemetry::{
814 trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
815 InstrumentationScope, KeyValue,
816 };
817 use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
818 use serde_json::Value;
819 use serial_test::serial;
820 use std::time::SystemTime;
821
822 fn create_test_span() -> SpanData {
823 let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
824 let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
825 let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
826
827 let span_context = SpanContext::new(
828 TraceId::from_bytes(trace_id_bytes),
829 SpanId::from_bytes(span_id_bytes),
830 TraceFlags::default(),
831 false,
832 TraceState::default(),
833 );
834
835 SpanData {
836 span_context,
837 parent_span_id: SpanId::from_bytes(parent_id_bytes),
838 span_kind: SpanKind::Client,
839 name: "test-span".into(),
840 start_time: SystemTime::UNIX_EPOCH,
841 end_time: SystemTime::UNIX_EPOCH,
842 attributes: vec![KeyValue::new("test.key", "test-value")],
843 dropped_attributes_count: 0,
844 events: SpanEvents::default(),
845 links: SpanLinks::default(),
846 status: Status::Ok,
847 instrumentation_scope: InstrumentationScope::builder("test-library")
848 .with_version("1.0.0")
849 .with_schema_url("https://opentelemetry.io/schema/1.0.0")
850 .build(),
851 }
852 }
853
854 #[test]
855 fn test_parse_headers() {
856 std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
857 std::env::set_var(
858 "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
859 "key2=override,key3=value3",
860 );
861
862 let headers = OtlpStdoutSpanExporter::parse_headers();
863
864 assert!(headers.is_some());
866 let headers = headers.unwrap();
867
868 assert_eq!(headers.get("key1").unwrap(), "value1");
869 assert_eq!(headers.get("key2").unwrap(), "override");
870 assert_eq!(headers.get("key3").unwrap(), "value3");
871
872 std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
874 std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
875 }
876
877 #[test]
878 fn test_service_name_resolution() {
879 std::env::set_var(env_vars::SERVICE_NAME, "otel-service");
881 std::env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "lambda-function");
882 assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
883
884 std::env::remove_var(env_vars::SERVICE_NAME);
886 assert_eq!(
887 OtlpStdoutSpanExporter::get_service_name(),
888 "lambda-function"
889 );
890
891 std::env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
893 assert_eq!(
894 OtlpStdoutSpanExporter::get_service_name(),
895 defaults::SERVICE_NAME
896 );
897 }
898
899 #[test]
900 fn test_compression_level_precedence() {
901 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
903 let exporter = OtlpStdoutSpanExporter::builder()
904 .compression_level(7)
905 .build();
906 assert_eq!(exporter.compression_level, 3);
907
908 std::env::set_var(env_vars::COMPRESSION_LEVEL, "invalid");
910 let exporter = OtlpStdoutSpanExporter::builder()
911 .compression_level(7)
912 .build();
913 assert_eq!(exporter.compression_level, 7);
914
915 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
917 let exporter = OtlpStdoutSpanExporter::builder()
918 .compression_level(7)
919 .build();
920 assert_eq!(exporter.compression_level, 7);
921
922 let exporter = OtlpStdoutSpanExporter::builder()
924 .compression_level(defaults::COMPRESSION_LEVEL)
925 .build();
926 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
927 }
928
929 #[test]
930 fn test_new_uses_env_compression_level() {
931 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
933 let exporter = OtlpStdoutSpanExporter::default();
934 assert_eq!(exporter.compression_level, 3);
935
936 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
938 let exporter = OtlpStdoutSpanExporter::default();
939 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
940 }
941
942 #[tokio::test]
943 #[serial]
944 async fn test_compression_level_affects_output_size() {
945 let mut spans = Vec::new();
947 for i in 0..100 {
948 let mut span = create_test_span();
949 span.attributes.push(KeyValue::new("index", i));
951 span.attributes
953 .push(KeyValue::new("data", "a".repeat(1000)));
954 spans.push(span);
955 }
956
957 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
959
960 let no_compression_output = Arc::new(TestOutput::new());
962 let no_compression_exporter = OtlpStdoutSpanExporter {
963 compression_level: 0,
964 resource: None,
965 output: no_compression_output.clone() as Arc<dyn Output>,
966 headers: None,
967 level: None,
968 };
969 let _ = no_compression_exporter.export(spans.clone()).await;
970 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
971
972 let max_compression_output = Arc::new(TestOutput::new());
974 let max_compression_exporter = OtlpStdoutSpanExporter {
975 compression_level: 9,
976 resource: None,
977 output: max_compression_output.clone() as Arc<dyn Output>,
978 headers: None,
979 level: None,
980 };
981 let _ = max_compression_exporter.export(spans.clone()).await;
982 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
983
984 assert!(no_compression_size > max_compression_size,
986 "Maximum compression (level 9) should produce output no larger than no compression (level 0). Got {} vs {}",
987 max_compression_size, no_compression_size);
988
989 let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
991 let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
992
993 assert_eq!(
994 no_compression_spans,
995 spans.len(),
996 "No compression output should contain all spans"
997 );
998 assert_eq!(
999 max_compression_spans,
1000 spans.len(),
1001 "Maximum compression output should contain all spans"
1002 );
1003 }
1004
1005 fn extract_payload_size(json_str: &str) -> usize {
1007 let json: Value = serde_json::from_str(json_str).unwrap();
1008 let payload = json["payload"].as_str().unwrap();
1009 base64_engine.decode(payload).unwrap().len()
1010 }
1011
1012 fn decode_and_count_spans(json_str: &str) -> usize {
1014 let json: Value = serde_json::from_str(json_str).unwrap();
1015 let payload = json["payload"].as_str().unwrap();
1016 let decoded = base64_engine.decode(payload).unwrap();
1017
1018 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
1019 let mut decompressed = Vec::new();
1020 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
1021
1022 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
1023
1024 let mut span_count = 0;
1026 for resource_span in &request.resource_spans {
1027 for scope_span in &resource_span.scope_spans {
1028 span_count += scope_span.spans.len();
1029 }
1030 }
1031
1032 span_count
1033 }
1034
1035 #[tokio::test]
1036 async fn test_export_single_span() {
1037 let (exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1038 let span = create_test_span();
1039
1040 let result = exporter.export(vec![span]).await;
1041 assert!(result.is_ok());
1042
1043 let output = output.get_output();
1044 assert_eq!(output.len(), 1);
1045
1046 let json: Value = serde_json::from_str(&output[0]).unwrap();
1048 assert_eq!(json["__otel_otlp_stdout"], VERSION);
1049 assert_eq!(json["method"], "POST");
1050 assert_eq!(json["content-type"], "application/x-protobuf");
1051 assert_eq!(json["content-encoding"], "gzip");
1052 assert_eq!(json["base64"], true);
1053
1054 let payload = json["payload"].as_str().unwrap();
1056 let decoded = base64_engine.decode(payload).unwrap();
1057
1058 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
1060 let mut decompressed = Vec::new();
1061 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
1062
1063 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
1065 assert_eq!(request.resource_spans.len(), 1);
1066 }
1067
1068 #[tokio::test]
1069 async fn test_export_empty_batch() {
1070 let exporter = OtlpStdoutSpanExporter::default();
1071 let result = exporter.export(vec![]).await;
1072 assert!(result.is_ok());
1073 }
1074
1075 #[test]
1076 #[serial]
1077 fn test_gzip_level_configuration() {
1078 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1080
1081 let exporter = OtlpStdoutSpanExporter::builder()
1083 .compression_level(9)
1084 .build();
1085 assert_eq!(exporter.compression_level, 9);
1086 }
1087
1088 #[tokio::test]
1089 #[serial]
1090 async fn test_env_var_affects_export_compression() {
1091 let span = create_test_span();
1093 let mut spans = Vec::new();
1094 for i in 0..100 {
1096 let mut span = span.clone();
1097 span.attributes
1099 .push(KeyValue::new(format!("test-key-{}", i), "a".repeat(1000)));
1100 spans.push(span);
1101 }
1102
1103 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1105 let no_compression_output = Arc::new(TestOutput::new());
1106 let mut no_compression_exporter = OtlpStdoutSpanExporter::builder()
1107 .compression_level(0)
1108 .build();
1109 no_compression_exporter.output = no_compression_output.clone() as Arc<dyn Output>;
1110 let _ = no_compression_exporter.export(spans.clone()).await;
1111 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
1112
1113 std::env::set_var(env_vars::COMPRESSION_LEVEL, "9");
1115 let max_compression_output = Arc::new(TestOutput::new());
1116 let mut max_compression_exporter = OtlpStdoutSpanExporter::builder()
1117 .compression_level(9)
1118 .build();
1119 max_compression_exporter.output = max_compression_output.clone() as Arc<dyn Output>;
1120 let _ = max_compression_exporter.export(spans.clone()).await;
1121 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
1122
1123 assert!(no_compression_size > max_compression_size,
1125 "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
1126 max_compression_size, no_compression_size);
1127
1128 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1130 let explicit_output = Arc::new(TestOutput::new());
1131
1132 let explicit_exporter = OtlpStdoutSpanExporter::builder()
1134 .output(explicit_output.clone())
1135 .build();
1136
1137 let _ = explicit_exporter.export(spans.clone()).await;
1139 let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
1140
1141 assert!(explicit_size > max_compression_size,
1144 "Environment variable should take precedence over explicitly set level. Expected size closer to {} but got {}",
1145 no_compression_size, explicit_size);
1146
1147 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1149 }
1150
1151 #[tokio::test]
1152 #[serial]
1153 async fn test_environment_variable_precedence() {
1154 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
1156
1157 let exporter = OtlpStdoutSpanExporter::builder()
1160 .compression_level(9)
1161 .build();
1162 assert_eq!(exporter.compression_level, 3);
1163
1164 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1166 let exporter = OtlpStdoutSpanExporter::builder()
1167 .compression_level(9)
1168 .build();
1169 assert_eq!(exporter.compression_level, 9);
1170 }
1171
1172 #[test]
1173 fn test_exporter_output_deserialization() {
1174 let json_str = r#"{
1176 "__otel_otlp_stdout": "0.11.1",
1177 "source": "test-service",
1178 "endpoint": "http://localhost:4318/v1/traces",
1179 "method": "POST",
1180 "content-type": "application/x-protobuf",
1181 "content-encoding": "gzip",
1182 "headers": {
1183 "api-key": "test-key",
1184 "custom-header": "test-value"
1185 },
1186 "payload": "SGVsbG8gd29ybGQ=",
1187 "base64": true
1188 }"#;
1189
1190 let output: ExporterOutput = serde_json::from_str(json_str).unwrap();
1192
1193 assert_eq!(output.version, "0.11.1");
1195 assert_eq!(output.source, "test-service");
1196 assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1197 assert_eq!(output.method, "POST");
1198 assert_eq!(output.content_type, "application/x-protobuf");
1199 assert_eq!(output.content_encoding, "gzip");
1200 assert_eq!(output.headers.as_ref().unwrap().len(), 2);
1201 assert_eq!(
1202 output.headers.as_ref().unwrap().get("api-key").unwrap(),
1203 "test-key"
1204 );
1205 assert_eq!(
1206 output
1207 .headers
1208 .as_ref()
1209 .unwrap()
1210 .get("custom-header")
1211 .unwrap(),
1212 "test-value"
1213 );
1214 assert_eq!(output.payload, "SGVsbG8gd29ybGQ=");
1215 assert!(output.base64);
1216
1217 let decoded = base64_engine.decode(&output.payload).unwrap();
1219 let payload_text = String::from_utf8(decoded).unwrap();
1220 assert_eq!(payload_text, "Hello world");
1221 }
1222
1223 #[test]
1224 fn test_exporter_output_deserialization_dynamic() {
1225 let version = "0.11.1".to_string();
1227 let service = "dynamic-service".to_string();
1228 let payload = base64_engine.encode("Dynamic payload");
1229
1230 let json_str = format!(
1232 r#"{{
1233 "__otel_otlp_stdout": "{}",
1234 "source": "{}",
1235 "endpoint": "http://localhost:4318/v1/traces",
1236 "method": "POST",
1237 "content-type": "application/x-protobuf",
1238 "content-encoding": "gzip",
1239 "headers": {{
1240 "dynamic-key": "dynamic-value"
1241 }},
1242 "payload": "{}",
1243 "base64": true
1244 }}"#,
1245 version, service, payload
1246 );
1247
1248 let output: ExporterOutput = serde_json::from_str(&json_str).unwrap();
1250
1251 assert_eq!(output.version, version);
1253 assert_eq!(output.source, service);
1254 assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1255 assert_eq!(output.method, "POST");
1256 assert_eq!(output.content_type, "application/x-protobuf");
1257 assert_eq!(output.content_encoding, "gzip");
1258 assert_eq!(output.headers.as_ref().unwrap().len(), 1);
1259 assert_eq!(
1260 output.headers.as_ref().unwrap().get("dynamic-key").unwrap(),
1261 "dynamic-value"
1262 );
1263 assert_eq!(output.payload, payload);
1264 assert!(output.base64);
1265
1266 let decoded = base64_engine.decode(&output.payload).unwrap();
1268 let payload_text = String::from_utf8(decoded).unwrap();
1269 assert_eq!(payload_text, "Dynamic payload");
1270 }
1271
1272 #[test]
1273 fn test_log_level_from_str() {
1274 assert_eq!(LogLevel::from_str("debug").unwrap(), LogLevel::Debug);
1275 assert_eq!(LogLevel::from_str("DEBUG").unwrap(), LogLevel::Debug);
1276 assert_eq!(LogLevel::from_str("info").unwrap(), LogLevel::Info);
1277 assert_eq!(LogLevel::from_str("INFO").unwrap(), LogLevel::Info);
1278 assert_eq!(LogLevel::from_str("warn").unwrap(), LogLevel::Warn);
1279 assert_eq!(LogLevel::from_str("warning").unwrap(), LogLevel::Warn);
1280 assert_eq!(LogLevel::from_str("WARN").unwrap(), LogLevel::Warn);
1281 assert_eq!(LogLevel::from_str("error").unwrap(), LogLevel::Error);
1282 assert_eq!(LogLevel::from_str("ERROR").unwrap(), LogLevel::Error);
1283
1284 assert!(LogLevel::from_str("invalid").is_err());
1285 }
1286
1287 #[test]
1288 fn test_log_level_display() {
1289 assert_eq!(LogLevel::Debug.to_string(), "DEBUG");
1290 assert_eq!(LogLevel::Info.to_string(), "INFO");
1291 assert_eq!(LogLevel::Warn.to_string(), "WARN");
1292 assert_eq!(LogLevel::Error.to_string(), "ERROR");
1293 }
1294
1295 #[test]
1296 #[serial]
1297 fn test_log_level_from_env() {
1298 std::env::set_var(env_vars::LOG_LEVEL, "debug");
1300 let exporter = OtlpStdoutSpanExporter::default();
1301 assert_eq!(exporter.level, Some(LogLevel::Debug));
1302
1303 std::env::set_var(env_vars::LOG_LEVEL, "invalid");
1305 let exporter = OtlpStdoutSpanExporter::default();
1306 assert_eq!(exporter.level, None);
1307
1308 std::env::remove_var(env_vars::LOG_LEVEL);
1310 let exporter = OtlpStdoutSpanExporter::builder()
1311 .level(LogLevel::Error)
1312 .build();
1313 assert_eq!(exporter.level, Some(LogLevel::Error));
1314
1315 std::env::set_var(env_vars::LOG_LEVEL, "warn");
1317 let exporter = OtlpStdoutSpanExporter::builder()
1318 .level(LogLevel::Error)
1319 .build();
1320 assert_eq!(exporter.level, Some(LogLevel::Warn));
1321
1322 std::env::remove_var(env_vars::LOG_LEVEL);
1324 }
1325
1326 #[tokio::test]
1327 #[serial]
1328 async fn test_log_level_in_output() {
1329 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1331 exporter.level = Some(LogLevel::Debug);
1332 let span = create_test_span();
1333
1334 let result = exporter.export(vec![span]).await;
1335 assert!(result.is_ok());
1336
1337 let output_lines = output.get_output();
1338 assert_eq!(output_lines.len(), 1);
1339
1340 let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1342 assert_eq!(json["level"], "DEBUG");
1343
1344 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1346 exporter.level = None;
1347 let span = create_test_span();
1348
1349 let result = exporter.export(vec![span]).await;
1350 assert!(result.is_ok());
1351
1352 let output_lines = output.get_output();
1353 assert_eq!(output_lines.len(), 1);
1354
1355 let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1357 assert!(!json.as_object().unwrap().contains_key("level"));
1358 }
1359
1360 #[test]
1361 fn test_stdout_output() {
1362 let output = create_output(false);
1363 assert!(format!("{:?}", output).contains("StdOutput"));
1365 }
1366
1367 #[test]
1368 fn test_pipe_output() {
1369 let output = create_output(true);
1370 let debug_str = format!("{:?}", output);
1372 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1373 }
1374
1375 #[test]
1376 fn test_env_var_precedence() {
1377 let temp_dir = std::env::temp_dir();
1379 let path = temp_dir.join("test_pipe");
1380
1381 std::env::remove_var(env_vars::OUTPUT_TYPE);
1383
1384 std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1386
1387 let exporter = OtlpStdoutSpanExporter::default();
1389
1390 let debug_str = format!("{:?}", exporter.output);
1392 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1393
1394 std::env::remove_var(env_vars::OUTPUT_TYPE);
1396 if path.exists() {
1397 let _ = std::fs::remove_file(path);
1398 }
1399 }
1400
1401 #[test]
1402 fn test_constructor_precedence() {
1403 let temp_dir = std::env::temp_dir();
1405 let path = temp_dir.join("test_pipe");
1406
1407 std::env::remove_var(env_vars::OUTPUT_TYPE);
1409
1410 let exporter = OtlpStdoutSpanExporter::builder().pipe(true).build();
1412
1413 let debug_str = format!("{:?}", exporter.output);
1415 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1416
1417 if path.exists() {
1419 let _ = std::fs::remove_file(path);
1420 }
1421 }
1422
1423 #[test]
1424 fn test_env_var_overrides_constructor() {
1425 let temp_dir = std::env::temp_dir();
1427 let path = temp_dir.join("test_pipe");
1428
1429 std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1431
1432 let exporter = OtlpStdoutSpanExporter::builder().pipe(false).build();
1434
1435 let debug_str = format!("{:?}", exporter.output);
1437 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1438
1439 std::env::remove_var(env_vars::OUTPUT_TYPE);
1441 if path.exists() {
1442 let _ = std::fs::remove_file(path);
1443 }
1444 }
1445}