1use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
118use bon::bon;
119use flate2::{write::GzEncoder, Compression};
120use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
121use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
122use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
123use opentelemetry_sdk::error::OTelSdkResult;
124use opentelemetry_sdk::resource::Resource;
125use opentelemetry_sdk::{
126 error::OTelSdkError,
127 trace::{SpanData, SpanExporter},
128};
129use prost::Message;
130use serde::{Deserialize, Serialize};
131use std::{
132 collections::HashMap,
133 env,
134 fmt::{self, Display},
135 fs::OpenOptions,
136 io::{self, Write},
137 path::PathBuf,
138 result::Result,
139 str::FromStr,
140 sync::{Arc, Mutex},
141};
142
143mod constants;
144use constants::{defaults, env_vars};
145
146pub mod consts {
148 pub use crate::constants::defaults;
154 pub use crate::constants::env_vars;
155 pub use crate::constants::resource_attributes;
156}
157
158const VERSION: &str = env!("CARGO_PKG_VERSION");
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
162pub enum LogLevel {
163 Debug,
165 #[default]
167 Info,
168 Warn,
170 Error,
172}
173
174impl FromStr for LogLevel {
175 type Err = String;
176
177 fn from_str(s: &str) -> Result<Self, Self::Err> {
178 match s.to_lowercase().as_str() {
179 "debug" => Ok(LogLevel::Debug),
180 "info" => Ok(LogLevel::Info),
181 "warn" | "warning" => Ok(LogLevel::Warn),
182 "error" => Ok(LogLevel::Error),
183 _ => Err(format!("Invalid log level: {s}")),
184 }
185 }
186}
187
188impl Display for LogLevel {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 match self {
191 LogLevel::Debug => write!(f, "DEBUG"),
192 LogLevel::Info => write!(f, "INFO"),
193 LogLevel::Warn => write!(f, "WARN"),
194 LogLevel::Error => write!(f, "ERROR"),
195 }
196 }
197}
198
199pub trait Output: Send + Sync + std::fmt::Debug + std::any::Any {
204 fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
214
215 fn is_pipe(&self) -> bool;
220
221 fn touch_pipe(&self) -> Result<(), OTelSdkError> {
232 Ok(())
234 }
235}
236
237#[derive(Debug, Default)]
239struct StdOutput;
240
241impl Output for StdOutput {
242 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
243 let stdout = io::stdout();
245 let mut handle = stdout.lock();
246
247 writeln!(handle, "{line}").map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
249
250 Ok(())
251 }
252
253 fn is_pipe(&self) -> bool {
254 false }
256}
257
258#[derive(Debug)]
260struct NamedPipeOutput {
261 path: PathBuf,
262}
263
264impl NamedPipeOutput {
265 fn new() -> Result<Self, OTelSdkError> {
266 let path_buf = PathBuf::from(defaults::PIPE_PATH);
267 if !path_buf.exists() {
268 log::warn!("Named pipe does not exist: {}", defaults::PIPE_PATH);
269 }
271
272 Ok(Self { path: path_buf })
273 }
274}
275
276impl Output for NamedPipeOutput {
277 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
278 let mut file = OpenOptions::new()
280 .write(true)
281 .open(&self.path)
282 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to open pipe: {e}")))?;
283
284 writeln!(file, "{line}")
286 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to write to pipe: {e}")))?;
287
288 Ok(())
289 }
290
291 fn is_pipe(&self) -> bool {
292 true }
294
295 fn touch_pipe(&self) -> Result<(), OTelSdkError> {
296 let _file = OpenOptions::new()
298 .write(true)
299 .open(&self.path)
300 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to touch pipe: {e}")))?;
301 Ok(())
302 }
303}
304
305#[derive(Clone, Default)]
307pub struct BufferOutput {
308 buffer: Arc<Mutex<Vec<String>>>,
309}
310
311impl BufferOutput {
312 pub fn new() -> Self {
314 Self::default()
315 }
316
317 pub fn take_lines(&self) -> Result<Vec<String>, OTelSdkError> {
319 let mut guard = self.buffer.lock().map_err(|e| {
320 OTelSdkError::InternalFailure(format!(
321 "Failed to lock buffer mutex for take_lines: {e}"
322 ))
323 })?;
324 Ok(std::mem::take(&mut *guard)) }
326}
327
328impl Output for BufferOutput {
329 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
330 let mut guard = self.buffer.lock().map_err(|e| {
331 OTelSdkError::InternalFailure(format!(
332 "Failed to lock buffer mutex for write_line: {e}"
333 ))
334 })?;
335 guard.push(line.to_string());
336 Ok(())
337 }
338
339 fn is_pipe(&self) -> bool {
340 false }
342}
343
344impl fmt::Debug for BufferOutput {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 f.debug_struct("BufferOutput").finish_non_exhaustive()
349 }
350}
351
352fn create_output(use_pipe: bool) -> Arc<dyn Output> {
354 if use_pipe {
355 match NamedPipeOutput::new() {
356 Ok(output) => Arc::new(output),
357 Err(e) => {
358 log::warn!("Failed to create named pipe output: {e}, falling back to stdout");
359 Arc::new(StdOutput)
360 }
361 }
362 } else {
363 Arc::new(StdOutput)
364 }
365}
366
367#[derive(Debug, Serialize, Deserialize)]
372pub struct ExporterOutput {
373 #[serde(rename = "__otel_otlp_stdout")]
375 pub version: String,
376 pub source: String,
378 pub endpoint: String,
380 pub method: String,
382 #[serde(rename = "content-type")]
384 pub content_type: String,
385 #[serde(rename = "content-encoding")]
387 pub content_encoding: String,
388 #[serde(skip_serializing_if = "ExporterOutput::is_headers_empty")]
390 pub headers: Option<HashMap<String, String>>,
391 pub payload: String,
393 pub base64: bool,
395 #[serde(skip_serializing_if = "Option::is_none")]
397 pub level: Option<String>,
398}
399
400impl ExporterOutput {
401 fn is_headers_empty(headers: &Option<HashMap<String, String>>) -> bool {
403 headers.as_ref().is_none_or(|h| h.is_empty())
404 }
405}
406
407#[derive(Debug)]
431pub struct OtlpStdoutSpanExporter {
432 compression_level: u8,
434 resource: Option<Resource>,
436 headers: Option<HashMap<String, String>>,
438 output: Arc<dyn Output>,
440 level: Option<LogLevel>,
442}
443
444impl Default for OtlpStdoutSpanExporter {
445 fn default() -> Self {
446 Self::builder().build()
447 }
448}
449#[bon]
450impl OtlpStdoutSpanExporter {
451 #[builder]
471 pub fn new(
472 compression_level: Option<u8>,
473 resource: Option<Resource>,
474 headers: Option<HashMap<String, String>>,
475 output: Option<Arc<dyn Output>>,
476 level: Option<LogLevel>,
477 pipe: Option<bool>,
478 ) -> Self {
479 let compression_level = match env::var(env_vars::COMPRESSION_LEVEL) {
481 Ok(value) => match value.parse::<u8>() {
482 Ok(level) if level <= 9 => level,
483 Ok(level) => {
484 log::warn!(
485 "Invalid value in {}: {} (must be 0-9), using fallback",
486 env_vars::COMPRESSION_LEVEL,
487 level
488 );
489 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
490 }
491 Err(_) => {
492 log::warn!(
493 "Failed to parse {}: {}, using fallback",
494 env_vars::COMPRESSION_LEVEL,
495 value
496 );
497 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
498 }
499 },
500 Err(_) => {
501 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
503 }
504 };
505
506 let headers = match headers {
508 Some(constructor_headers) => {
509 if let Some(env_headers) = Self::parse_headers() {
510 let mut merged = constructor_headers;
512 merged.extend(env_headers);
513 Some(merged)
514 } else {
515 Some(constructor_headers)
517 }
518 }
519 None => Self::parse_headers(), };
521
522 let level = match env::var(env_vars::LOG_LEVEL) {
524 Ok(value) => match LogLevel::from_str(&value) {
525 Ok(log_level) => Some(log_level),
526 Err(e) => {
527 log::warn!(
528 "Invalid log level in {}: {}, using fallback",
529 env_vars::LOG_LEVEL,
530 e
531 );
532 level
533 }
534 },
535 Err(_) => {
536 level
538 }
539 };
540
541 let use_pipe = match env::var(env_vars::OUTPUT_TYPE) {
543 Ok(value) => value.to_lowercase() == "pipe",
544 Err(_) => pipe.unwrap_or(false),
545 };
546
547 let output = output.unwrap_or_else(|| create_output(use_pipe));
549
550 Self {
551 compression_level,
552 resource,
553 headers,
554 output,
555 level,
556 }
557 }
558
559 fn get_service_name() -> String {
567 env::var(env_vars::SERVICE_NAME)
568 .or_else(|_| env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME))
569 .unwrap_or_else(|_| defaults::SERVICE_NAME.to_string())
570 }
571
572 #[cfg(test)]
573 fn with_test_output() -> (Self, Arc<TestOutput>) {
574 let output = Arc::new(TestOutput::new());
575
576 let exporter = Self::builder().output(output.clone()).build();
578
579 (exporter, output)
580 }
581
582 fn parse_headers() -> Option<HashMap<String, String>> {
587 let get_headers = |var_name: &str| -> Option<HashMap<String, String>> {
589 env::var(var_name).ok().map(|header_str| {
590 let mut map = HashMap::new();
591 Self::parse_header_string(&header_str, &mut map);
592 map
593 })
594 };
595
596 let global_headers = get_headers("OTEL_EXPORTER_OTLP_HEADERS");
598 let trace_headers = get_headers("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
599
600 if global_headers.is_none() && trace_headers.is_none() {
602 return None;
603 }
604
605 let mut result = HashMap::new();
607
608 if let Some(headers) = global_headers {
610 result.extend(headers);
611 }
612
613 if let Some(headers) = trace_headers {
615 result.extend(headers);
616 }
617
618 if result.is_empty() {
620 None
621 } else {
622 Some(result)
623 }
624 }
625
626 fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
633 for pair in header_str.split(',') {
634 if let Some((key, value)) = pair.split_once('=') {
635 let key = key.trim().to_lowercase();
636 if key != "content-type" && key != "content-encoding" {
638 headers.insert(key, value.trim().to_string());
639 }
640 }
641 }
642 }
643}
644
645impl SpanExporter for OtlpStdoutSpanExporter {
646 fn export(
663 &self,
664 batch: Vec<SpanData>,
665 ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
666 if batch.is_empty() && self.output.is_pipe() {
668 let touch_result = self.output.touch_pipe();
670 return Box::pin(std::future::ready(touch_result));
671 }
672
673 let result = (|| {
675 let resource = self
677 .resource
678 .clone()
679 .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
680 let resource_attrs = ResourceAttributesWithSchema::from(&resource);
681 let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
682 let request = ExportTraceServiceRequest { resource_spans };
683
684 let proto_bytes = request.encode_to_vec();
686
687 let mut encoder =
689 GzEncoder::new(Vec::new(), Compression::new(self.compression_level as u32));
690 encoder
691 .write_all(&proto_bytes)
692 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
693 let compressed_bytes = encoder
694 .finish()
695 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
696
697 let payload = base64_engine.encode(compressed_bytes);
699
700 let output_data = ExporterOutput {
702 version: VERSION.to_string(),
703 source: Self::get_service_name(),
704 endpoint: defaults::ENDPOINT.to_string(),
705 method: "POST".to_string(),
706 content_type: "application/x-protobuf".to_string(),
707 content_encoding: "gzip".to_string(),
708 headers: self.headers.clone(),
709 payload,
710 base64: true,
711 level: self.level.map(|l| l.to_string()),
712 };
713
714 self.output.write_line(
716 &serde_json::to_string(&output_data)
717 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
718 )?;
719
720 Ok(())
721 })();
722
723 Box::pin(std::future::ready(result))
725 }
726
727 fn shutdown_with_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), OTelSdkError> {
736 Ok(())
737 }
738
739 fn force_flush(&mut self) -> Result<(), OTelSdkError> {
747 Ok(())
748 }
749
750 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
760 self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
761 resource.clone(),
762 ));
763 }
764}
765
766#[cfg(doctest)]
767#[macro_use]
768extern crate doc_comment;
769
770#[cfg(doctest)]
771use doc_comment::doctest;
772
773#[cfg(doctest)]
774doctest!("../README.md", readme);
775
776#[cfg(test)]
778#[derive(Debug, Default)]
779struct TestOutput {
780 buffer: Arc<Mutex<Vec<String>>>,
781}
782
783#[cfg(test)]
784impl TestOutput {
785 fn new() -> Self {
786 Self {
787 buffer: Arc::new(Mutex::new(Vec::new())),
788 }
789 }
790
791 fn get_output(&self) -> Vec<String> {
792 self.buffer.lock().unwrap().clone()
793 }
794}
795
796#[cfg(test)]
797impl Output for TestOutput {
798 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
799 self.buffer.lock().unwrap().push(line.to_string());
800 Ok(())
801 }
802
803 fn is_pipe(&self) -> bool {
804 false }
806
807 }
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813 #[cfg(unix)]
814 use nix::{sys::stat::Mode, unistd::mkfifo};
815 use opentelemetry::{
816 trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
817 InstrumentationScope, KeyValue,
818 };
819 use opentelemetry_proto::tonic::{
820 common::v1::any_value::Value as AnyValue, trace::v1::SpanFlags,
821 };
822 use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
823 use serde_json::Value;
824 use serial_test::serial;
825 use std::{
826 fs::OpenOptions,
827 io::Read,
828 path::PathBuf,
829 sync::Arc,
830 thread,
831 time::{Duration, SystemTime, UNIX_EPOCH},
832 };
833
834 #[derive(Debug)]
835 struct FailingOutput;
836
837 impl Output for FailingOutput {
838 fn write_line(&self, _line: &str) -> Result<(), OTelSdkError> {
839 Err(OTelSdkError::InternalFailure(
840 "intentional test sink failure".to_string(),
841 ))
842 }
843
844 fn is_pipe(&self) -> bool {
845 false
846 }
847 }
848
849 fn create_test_span() -> SpanData {
850 let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
851 let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
852 let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
853
854 let span_context = SpanContext::new(
855 TraceId::from_bytes(trace_id_bytes),
856 SpanId::from_bytes(span_id_bytes),
857 TraceFlags::default(),
858 false,
859 TraceState::default(),
860 );
861
862 SpanData {
863 span_context,
864 parent_span_id: SpanId::from_bytes(parent_id_bytes),
865 parent_span_is_remote: false,
866 span_kind: SpanKind::Client,
867 name: "test-span".into(),
868 start_time: SystemTime::UNIX_EPOCH,
869 end_time: SystemTime::UNIX_EPOCH,
870 attributes: vec![KeyValue::new("test.key", "test-value")],
871 dropped_attributes_count: 0,
872 events: SpanEvents::default(),
873 links: SpanLinks::default(),
874 status: Status::Ok,
875 instrumentation_scope: InstrumentationScope::builder("test-library")
876 .with_version("1.0.0")
877 .with_schema_url("https://opentelemetry.io/schema/1.0.0")
878 .build(),
879 }
880 }
881
882 fn decode_export_request(json_str: &str) -> ExportTraceServiceRequest {
883 let json: Value = serde_json::from_str(json_str).unwrap();
884 let payload = json["payload"].as_str().unwrap();
885 let decoded = base64_engine.decode(payload).unwrap();
886
887 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
888 let mut decompressed = Vec::new();
889 decoder.read_to_end(&mut decompressed).unwrap();
890
891 ExportTraceServiceRequest::decode(&*decompressed).unwrap()
892 }
893
894 fn unique_test_pipe_path(name: &str) -> PathBuf {
895 let now = SystemTime::now()
896 .duration_since(UNIX_EPOCH)
897 .unwrap()
898 .as_nanos();
899 std::env::temp_dir().join(format!(
900 "otlp-stdout-span-exporter-{name}-{}-{now}.fifo",
901 std::process::id()
902 ))
903 }
904
905 #[cfg(unix)]
906 fn create_test_fifo(name: &str) -> PathBuf {
907 let path = unique_test_pipe_path(name);
908 let _ = std::fs::remove_file(&path);
909 mkfifo(&path, Mode::from_bits_truncate(0o600)).unwrap();
910 path
911 }
912
913 #[test]
914 fn test_parse_headers() {
915 std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
916 std::env::set_var(
917 "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
918 "key2=override,key3=value3",
919 );
920
921 let headers = OtlpStdoutSpanExporter::parse_headers();
922
923 assert!(headers.is_some());
925 let headers = headers.unwrap();
926
927 assert_eq!(headers.get("key1").unwrap(), "value1");
928 assert_eq!(headers.get("key2").unwrap(), "override");
929 assert_eq!(headers.get("key3").unwrap(), "value3");
930
931 std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
933 std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
934 }
935
936 #[test]
937 fn test_service_name_resolution() {
938 std::env::set_var(env_vars::SERVICE_NAME, "otel-service");
940 std::env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "lambda-function");
941 assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
942
943 std::env::remove_var(env_vars::SERVICE_NAME);
945 assert_eq!(
946 OtlpStdoutSpanExporter::get_service_name(),
947 "lambda-function"
948 );
949
950 std::env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
952 assert_eq!(
953 OtlpStdoutSpanExporter::get_service_name(),
954 defaults::SERVICE_NAME
955 );
956 }
957
958 #[test]
959 fn test_compression_level_precedence() {
960 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
962 let exporter = OtlpStdoutSpanExporter::builder()
963 .compression_level(7)
964 .build();
965 assert_eq!(exporter.compression_level, 3);
966
967 std::env::set_var(env_vars::COMPRESSION_LEVEL, "invalid");
969 let exporter = OtlpStdoutSpanExporter::builder()
970 .compression_level(7)
971 .build();
972 assert_eq!(exporter.compression_level, 7);
973
974 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
976 let exporter = OtlpStdoutSpanExporter::builder()
977 .compression_level(7)
978 .build();
979 assert_eq!(exporter.compression_level, 7);
980
981 let exporter = OtlpStdoutSpanExporter::builder()
983 .compression_level(defaults::COMPRESSION_LEVEL)
984 .build();
985 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
986 }
987
988 #[test]
989 fn test_new_uses_env_compression_level() {
990 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
992 let exporter = OtlpStdoutSpanExporter::default();
993 assert_eq!(exporter.compression_level, 3);
994
995 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
997 let exporter = OtlpStdoutSpanExporter::default();
998 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
999 }
1000
1001 #[tokio::test]
1002 #[serial]
1003 async fn test_compression_level_affects_output_size() {
1004 let mut spans = Vec::new();
1006 for i in 0..100 {
1007 let mut span = create_test_span();
1008 span.attributes.push(KeyValue::new("index", i));
1010 span.attributes
1012 .push(KeyValue::new("data", "a".repeat(1000)));
1013 spans.push(span);
1014 }
1015
1016 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1018
1019 let no_compression_output = Arc::new(TestOutput::new());
1021 let no_compression_exporter = OtlpStdoutSpanExporter {
1022 compression_level: 0,
1023 resource: None,
1024 output: no_compression_output.clone() as Arc<dyn Output>,
1025 headers: None,
1026 level: None,
1027 };
1028 let _ = no_compression_exporter.export(spans.clone()).await;
1029 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
1030
1031 let max_compression_output = Arc::new(TestOutput::new());
1033 let max_compression_exporter = OtlpStdoutSpanExporter {
1034 compression_level: 9,
1035 resource: None,
1036 output: max_compression_output.clone() as Arc<dyn Output>,
1037 headers: None,
1038 level: None,
1039 };
1040 let _ = max_compression_exporter.export(spans.clone()).await;
1041 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
1042
1043 assert!(no_compression_size > max_compression_size,
1045 "Maximum compression (level 9) should produce output no larger than no compression (level 0). Got {} vs {}",
1046 max_compression_size, no_compression_size);
1047
1048 let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
1050 let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
1051
1052 assert_eq!(
1053 no_compression_spans,
1054 spans.len(),
1055 "No compression output should contain all spans"
1056 );
1057 assert_eq!(
1058 max_compression_spans,
1059 spans.len(),
1060 "Maximum compression output should contain all spans"
1061 );
1062 }
1063
1064 fn extract_payload_size(json_str: &str) -> usize {
1066 let json: Value = serde_json::from_str(json_str).unwrap();
1067 let payload = json["payload"].as_str().unwrap();
1068 base64_engine.decode(payload).unwrap().len()
1069 }
1070
1071 fn decode_and_count_spans(json_str: &str) -> usize {
1073 let request = decode_export_request(json_str);
1074
1075 let mut span_count = 0;
1077 for resource_span in &request.resource_spans {
1078 for scope_span in &resource_span.scope_spans {
1079 span_count += scope_span.spans.len();
1080 }
1081 }
1082
1083 span_count
1084 }
1085
1086 #[tokio::test]
1087 async fn test_export_single_span() {
1088 let (exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1089 let span = create_test_span();
1090
1091 let result = exporter.export(vec![span]).await;
1092 assert!(result.is_ok());
1093
1094 let output = output.get_output();
1095 assert_eq!(output.len(), 1);
1096
1097 let json: Value = serde_json::from_str(&output[0]).unwrap();
1099 assert_eq!(json["__otel_otlp_stdout"], VERSION);
1100 assert_eq!(json["method"], "POST");
1101 assert_eq!(json["content-type"], "application/x-protobuf");
1102 assert_eq!(json["content-encoding"], "gzip");
1103 assert_eq!(json["base64"], true);
1104
1105 let payload = json["payload"].as_str().unwrap();
1107 let decoded = base64_engine.decode(payload).unwrap();
1108
1109 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
1111 let mut decompressed = Vec::new();
1112 decoder.read_to_end(&mut decompressed).unwrap();
1113
1114 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
1116 assert_eq!(request.resource_spans.len(), 1);
1117 }
1118
1119 #[tokio::test]
1120 async fn test_export_preserves_remote_parent_flags_and_resource_attributes() {
1121 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1122 let resource = Resource::builder_empty()
1123 .with_attributes([
1124 KeyValue::new("service.name", "span-exporter-tests"),
1125 KeyValue::new("deployment.environment", "test"),
1126 ])
1127 .build();
1128 exporter.set_resource(&resource);
1129
1130 let mut span = create_test_span();
1131 span.parent_span_is_remote = true;
1132
1133 exporter.export(vec![span]).await.unwrap();
1134
1135 let output = output.get_output();
1136 assert_eq!(output.len(), 1);
1137
1138 let request = decode_export_request(&output[0]);
1139 let resource_span = request.resource_spans.first().unwrap();
1140 let scope_span = resource_span.scope_spans.first().unwrap();
1141 let exported_span = scope_span.spans.first().unwrap();
1142
1143 assert_eq!(
1144 exported_span.flags & SpanFlags::ContextHasIsRemoteMask as u32,
1145 SpanFlags::ContextHasIsRemoteMask as u32
1146 );
1147 assert_eq!(
1148 exported_span.flags & SpanFlags::ContextIsRemoteMask as u32,
1149 SpanFlags::ContextIsRemoteMask as u32
1150 );
1151
1152 let resource = resource_span.resource.as_ref().unwrap();
1153 let attrs = &resource.attributes;
1154 assert!(attrs.iter().any(|attr| {
1155 attr.key == "service.name"
1156 && attr.value.as_ref().and_then(|value| value.value.as_ref())
1157 == Some(&AnyValue::StringValue("span-exporter-tests".to_string()))
1158 }));
1159 assert!(attrs.iter().any(|attr| {
1160 attr.key == "deployment.environment"
1161 && attr.value.as_ref().and_then(|value| value.value.as_ref())
1162 == Some(&AnyValue::StringValue("test".to_string()))
1163 }));
1164 }
1165
1166 #[tokio::test]
1167 async fn test_export_empty_batch() {
1168 let exporter = OtlpStdoutSpanExporter::default();
1169 let result = exporter.export(vec![]).await;
1170 assert!(result.is_ok());
1171 }
1172
1173 #[tokio::test]
1174 async fn test_export_propagates_output_errors() {
1175 let exporter = OtlpStdoutSpanExporter::builder()
1176 .output(Arc::new(FailingOutput))
1177 .build();
1178
1179 let err = exporter.export(vec![create_test_span()]).await.unwrap_err();
1180 assert!(
1181 matches!(err, OTelSdkError::InternalFailure(message) if message == "intentional test sink failure")
1182 );
1183 }
1184
1185 #[test]
1186 #[serial]
1187 fn test_gzip_level_configuration() {
1188 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1190
1191 let exporter = OtlpStdoutSpanExporter::builder()
1193 .compression_level(9)
1194 .build();
1195 assert_eq!(exporter.compression_level, 9);
1196 }
1197
1198 #[tokio::test]
1199 #[serial]
1200 async fn test_env_var_affects_export_compression() {
1201 let span = create_test_span();
1203 let mut spans = Vec::new();
1204 for i in 0..100 {
1206 let mut span = span.clone();
1207 span.attributes
1209 .push(KeyValue::new(format!("test-key-{}", i), "a".repeat(1000)));
1210 spans.push(span);
1211 }
1212
1213 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1215 let no_compression_output = Arc::new(TestOutput::new());
1216 let mut no_compression_exporter = OtlpStdoutSpanExporter::builder()
1217 .compression_level(0)
1218 .build();
1219 no_compression_exporter.output = no_compression_output.clone() as Arc<dyn Output>;
1220 let _ = no_compression_exporter.export(spans.clone()).await;
1221 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
1222
1223 std::env::set_var(env_vars::COMPRESSION_LEVEL, "9");
1225 let max_compression_output = Arc::new(TestOutput::new());
1226 let mut max_compression_exporter = OtlpStdoutSpanExporter::builder()
1227 .compression_level(9)
1228 .build();
1229 max_compression_exporter.output = max_compression_output.clone() as Arc<dyn Output>;
1230 let _ = max_compression_exporter.export(spans.clone()).await;
1231 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
1232
1233 assert!(no_compression_size > max_compression_size,
1235 "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
1236 max_compression_size, no_compression_size);
1237
1238 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1240 let explicit_output = Arc::new(TestOutput::new());
1241
1242 let explicit_exporter = OtlpStdoutSpanExporter::builder()
1244 .output(explicit_output.clone())
1245 .build();
1246
1247 let _ = explicit_exporter.export(spans.clone()).await;
1249 let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
1250
1251 assert!(explicit_size > max_compression_size,
1254 "Environment variable should take precedence over explicitly set level. Expected size closer to {} but got {}",
1255 no_compression_size, explicit_size);
1256
1257 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1259 }
1260
1261 #[tokio::test]
1262 #[serial]
1263 async fn test_environment_variable_precedence() {
1264 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
1266
1267 let exporter = OtlpStdoutSpanExporter::builder()
1270 .compression_level(9)
1271 .build();
1272 assert_eq!(exporter.compression_level, 3);
1273
1274 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1276 let exporter = OtlpStdoutSpanExporter::builder()
1277 .compression_level(9)
1278 .build();
1279 assert_eq!(exporter.compression_level, 9);
1280 }
1281
1282 #[test]
1283 fn test_exporter_output_deserialization() {
1284 let json_str = r#"{
1286 "__otel_otlp_stdout": "0.11.1",
1287 "source": "test-service",
1288 "endpoint": "http://localhost:4318/v1/traces",
1289 "method": "POST",
1290 "content-type": "application/x-protobuf",
1291 "content-encoding": "gzip",
1292 "headers": {
1293 "api-key": "test-key",
1294 "custom-header": "test-value"
1295 },
1296 "payload": "SGVsbG8gd29ybGQ=",
1297 "base64": true
1298 }"#;
1299
1300 let output: ExporterOutput = serde_json::from_str(json_str).unwrap();
1302
1303 assert_eq!(output.version, "0.11.1");
1305 assert_eq!(output.source, "test-service");
1306 assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1307 assert_eq!(output.method, "POST");
1308 assert_eq!(output.content_type, "application/x-protobuf");
1309 assert_eq!(output.content_encoding, "gzip");
1310 assert_eq!(output.headers.as_ref().unwrap().len(), 2);
1311 assert_eq!(
1312 output.headers.as_ref().unwrap().get("api-key").unwrap(),
1313 "test-key"
1314 );
1315 assert_eq!(
1316 output
1317 .headers
1318 .as_ref()
1319 .unwrap()
1320 .get("custom-header")
1321 .unwrap(),
1322 "test-value"
1323 );
1324 assert_eq!(output.payload, "SGVsbG8gd29ybGQ=");
1325 assert!(output.base64);
1326
1327 let decoded = base64_engine.decode(&output.payload).unwrap();
1329 let payload_text = String::from_utf8(decoded).unwrap();
1330 assert_eq!(payload_text, "Hello world");
1331 }
1332
1333 #[test]
1334 fn test_exporter_output_deserialization_dynamic() {
1335 let version = "0.11.1".to_string();
1337 let service = "dynamic-service".to_string();
1338 let payload = base64_engine.encode("Dynamic payload");
1339
1340 let json_str = format!(
1342 r#"{{
1343 "__otel_otlp_stdout": "{}",
1344 "source": "{}",
1345 "endpoint": "http://localhost:4318/v1/traces",
1346 "method": "POST",
1347 "content-type": "application/x-protobuf",
1348 "content-encoding": "gzip",
1349 "headers": {{
1350 "dynamic-key": "dynamic-value"
1351 }},
1352 "payload": "{}",
1353 "base64": true
1354 }}"#,
1355 version, service, payload
1356 );
1357
1358 let output: ExporterOutput = serde_json::from_str(&json_str).unwrap();
1360
1361 assert_eq!(output.version, version);
1363 assert_eq!(output.source, service);
1364 assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1365 assert_eq!(output.method, "POST");
1366 assert_eq!(output.content_type, "application/x-protobuf");
1367 assert_eq!(output.content_encoding, "gzip");
1368 assert_eq!(output.headers.as_ref().unwrap().len(), 1);
1369 assert_eq!(
1370 output.headers.as_ref().unwrap().get("dynamic-key").unwrap(),
1371 "dynamic-value"
1372 );
1373 assert_eq!(output.payload, payload);
1374 assert!(output.base64);
1375
1376 let decoded = base64_engine.decode(&output.payload).unwrap();
1378 let payload_text = String::from_utf8(decoded).unwrap();
1379 assert_eq!(payload_text, "Dynamic payload");
1380 }
1381
1382 #[test]
1383 fn test_log_level_from_str() {
1384 assert_eq!(LogLevel::from_str("debug").unwrap(), LogLevel::Debug);
1385 assert_eq!(LogLevel::from_str("DEBUG").unwrap(), LogLevel::Debug);
1386 assert_eq!(LogLevel::from_str("info").unwrap(), LogLevel::Info);
1387 assert_eq!(LogLevel::from_str("INFO").unwrap(), LogLevel::Info);
1388 assert_eq!(LogLevel::from_str("warn").unwrap(), LogLevel::Warn);
1389 assert_eq!(LogLevel::from_str("warning").unwrap(), LogLevel::Warn);
1390 assert_eq!(LogLevel::from_str("WARN").unwrap(), LogLevel::Warn);
1391 assert_eq!(LogLevel::from_str("error").unwrap(), LogLevel::Error);
1392 assert_eq!(LogLevel::from_str("ERROR").unwrap(), LogLevel::Error);
1393
1394 assert!(LogLevel::from_str("invalid").is_err());
1395 }
1396
1397 #[test]
1398 fn test_log_level_display() {
1399 assert_eq!(LogLevel::Debug.to_string(), "DEBUG");
1400 assert_eq!(LogLevel::Info.to_string(), "INFO");
1401 assert_eq!(LogLevel::Warn.to_string(), "WARN");
1402 assert_eq!(LogLevel::Error.to_string(), "ERROR");
1403 }
1404
1405 #[test]
1406 fn test_buffer_output_round_trip() {
1407 let output = BufferOutput::new();
1408 output.write_line("first").unwrap();
1409 output.write_line("second").unwrap();
1410
1411 assert_eq!(
1412 output.take_lines().unwrap(),
1413 vec!["first".to_string(), "second".to_string()]
1414 );
1415 assert!(output.take_lines().unwrap().is_empty());
1416 assert!(format!("{output:?}").contains("BufferOutput"));
1417 }
1418
1419 #[test]
1420 #[serial]
1421 fn test_log_level_from_env() {
1422 std::env::set_var(env_vars::LOG_LEVEL, "debug");
1424 let exporter = OtlpStdoutSpanExporter::default();
1425 assert_eq!(exporter.level, Some(LogLevel::Debug));
1426
1427 std::env::set_var(env_vars::LOG_LEVEL, "invalid");
1429 let exporter = OtlpStdoutSpanExporter::default();
1430 assert_eq!(exporter.level, None);
1431
1432 std::env::remove_var(env_vars::LOG_LEVEL);
1434 let exporter = OtlpStdoutSpanExporter::builder()
1435 .level(LogLevel::Error)
1436 .build();
1437 assert_eq!(exporter.level, Some(LogLevel::Error));
1438
1439 std::env::set_var(env_vars::LOG_LEVEL, "warn");
1441 let exporter = OtlpStdoutSpanExporter::builder()
1442 .level(LogLevel::Error)
1443 .build();
1444 assert_eq!(exporter.level, Some(LogLevel::Warn));
1445
1446 std::env::remove_var(env_vars::LOG_LEVEL);
1448 }
1449
1450 #[test]
1451 #[serial]
1452 fn test_invalid_numeric_compression_level_falls_back() {
1453 std::env::set_var(env_vars::COMPRESSION_LEVEL, "99");
1454 let exporter = OtlpStdoutSpanExporter::builder()
1455 .compression_level(4)
1456 .build();
1457 assert_eq!(exporter.compression_level, 4);
1458 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1459 }
1460
1461 #[test]
1462 #[serial]
1463 fn test_header_merge_and_filtering() {
1464 std::env::set_var(
1465 env_vars::OTLP_HEADERS,
1466 "content-type=bad, malformed, x-env=env-value",
1467 );
1468 std::env::set_var(
1469 env_vars::OTLP_TRACES_HEADERS,
1470 "content-encoding=bad, x-env=trace-value, x-trace=trace-only",
1471 );
1472
1473 let mut constructor_headers = HashMap::new();
1474 constructor_headers.insert("x-constructor".to_string(), "constructor-value".to_string());
1475 constructor_headers.insert("x-env".to_string(), "constructor-env".to_string());
1476
1477 let exporter = OtlpStdoutSpanExporter::builder()
1478 .headers(constructor_headers)
1479 .build();
1480 let headers = exporter.headers.unwrap();
1481
1482 assert_eq!(headers.get("x-constructor").unwrap(), "constructor-value");
1483 assert_eq!(headers.get("x-env").unwrap(), "trace-value");
1484 assert_eq!(headers.get("x-trace").unwrap(), "trace-only");
1485 assert!(!headers.contains_key("content-type"));
1486 assert!(!headers.contains_key("content-encoding"));
1487
1488 std::env::set_var(
1489 env_vars::OTLP_HEADERS,
1490 "content-type=bad, content-encoding=bad, malformed",
1491 );
1492 std::env::remove_var(env_vars::OTLP_TRACES_HEADERS);
1493 assert!(OtlpStdoutSpanExporter::parse_headers().is_none());
1494
1495 std::env::remove_var(env_vars::OTLP_HEADERS);
1496 std::env::remove_var(env_vars::OTLP_TRACES_HEADERS);
1497 }
1498
1499 #[test]
1500 fn test_shutdown_and_force_flush_are_noops() {
1501 let mut exporter = OtlpStdoutSpanExporter::default();
1502 assert!(exporter.force_flush().is_ok());
1503 assert!(exporter
1504 .shutdown_with_timeout(Duration::from_millis(1))
1505 .is_ok());
1506 }
1507
1508 #[tokio::test]
1509 #[serial]
1510 async fn test_log_level_in_output() {
1511 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1513 exporter.level = Some(LogLevel::Debug);
1514 let span = create_test_span();
1515
1516 let result = exporter.export(vec![span]).await;
1517 assert!(result.is_ok());
1518
1519 let output_lines = output.get_output();
1520 assert_eq!(output_lines.len(), 1);
1521
1522 let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1524 assert_eq!(json["level"], "DEBUG");
1525
1526 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1528 exporter.level = None;
1529 let span = create_test_span();
1530
1531 let result = exporter.export(vec![span]).await;
1532 assert!(result.is_ok());
1533
1534 let output_lines = output.get_output();
1535 assert_eq!(output_lines.len(), 1);
1536
1537 let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1539 assert!(!json.as_object().unwrap().contains_key("level"));
1540 }
1541
1542 #[cfg(unix)]
1543 #[test]
1544 fn test_named_pipe_output_writes_to_real_fifo() {
1545 let path = create_test_fifo("write-line");
1546 let path_for_reader = path.clone();
1547
1548 let reader = thread::spawn(move || {
1549 let mut file = OpenOptions::new()
1550 .read(true)
1551 .open(&path_for_reader)
1552 .unwrap();
1553 let mut contents = String::new();
1554 file.read_to_string(&mut contents).unwrap();
1555 contents
1556 });
1557
1558 let output = NamedPipeOutput { path: path.clone() };
1559 assert!(output.is_pipe());
1560 output.write_line("hello from fifo").unwrap();
1561
1562 let contents = reader.join().unwrap();
1563 assert_eq!(contents, "hello from fifo\n");
1564 std::fs::remove_file(path).unwrap();
1565 }
1566
1567 #[cfg(unix)]
1568 #[tokio::test]
1569 async fn test_export_empty_batch_touches_real_fifo() {
1570 let path = create_test_fifo("touch-pipe");
1571 let path_for_reader = path.clone();
1572
1573 let reader = thread::spawn(move || {
1574 let mut file = OpenOptions::new()
1575 .read(true)
1576 .open(&path_for_reader)
1577 .unwrap();
1578 let mut bytes = Vec::new();
1579 file.read_to_end(&mut bytes).unwrap();
1580 bytes
1581 });
1582
1583 let exporter = OtlpStdoutSpanExporter {
1584 compression_level: defaults::COMPRESSION_LEVEL,
1585 resource: None,
1586 headers: None,
1587 output: Arc::new(NamedPipeOutput { path: path.clone() }),
1588 level: None,
1589 };
1590
1591 exporter.export(vec![]).await.unwrap();
1592
1593 let bytes = reader.join().unwrap();
1594 assert!(bytes.is_empty());
1595 std::fs::remove_file(path).unwrap();
1596 }
1597
1598 #[test]
1599 fn test_stdout_output() {
1600 let output = create_output(false);
1601 assert!(format!("{:?}", output).contains("StdOutput"));
1603 }
1604
1605 #[test]
1606 fn test_pipe_output() {
1607 let output = create_output(true);
1608 let debug_str = format!("{:?}", output);
1610 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1611 }
1612
1613 #[test]
1614 fn test_env_var_precedence() {
1615 let temp_dir = std::env::temp_dir();
1617 let path = temp_dir.join("test_pipe");
1618
1619 std::env::remove_var(env_vars::OUTPUT_TYPE);
1621
1622 std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1624
1625 let exporter = OtlpStdoutSpanExporter::default();
1627
1628 let debug_str = format!("{:?}", exporter.output);
1630 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1631
1632 std::env::remove_var(env_vars::OUTPUT_TYPE);
1634 if path.exists() {
1635 let _ = std::fs::remove_file(path);
1636 }
1637 }
1638
1639 #[test]
1640 fn test_constructor_precedence() {
1641 let temp_dir = std::env::temp_dir();
1643 let path = temp_dir.join("test_pipe");
1644
1645 std::env::remove_var(env_vars::OUTPUT_TYPE);
1647
1648 let exporter = OtlpStdoutSpanExporter::builder().pipe(true).build();
1650
1651 let debug_str = format!("{:?}", exporter.output);
1653 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1654
1655 if path.exists() {
1657 let _ = std::fs::remove_file(path);
1658 }
1659 }
1660
1661 #[test]
1662 fn test_env_var_overrides_constructor() {
1663 let temp_dir = std::env::temp_dir();
1665 let path = temp_dir.join("test_pipe");
1666
1667 std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1669
1670 let exporter = OtlpStdoutSpanExporter::builder().pipe(false).build();
1672
1673 let debug_str = format!("{:?}", exporter.output);
1675 assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1676
1677 std::env::remove_var(env_vars::OUTPUT_TYPE);
1679 if path.exists() {
1680 let _ = std::fs::remove_file(path);
1681 }
1682 }
1683}