otlp_stdout_span_exporter/
lib.rs

1//! A span exporter that writes OpenTelemetry spans to stdout in OTLP format.
2//!
3//! This crate provides an implementation of OpenTelemetry's [`SpanExporter`] that writes spans to stdout
4//! in OTLP (OpenTelemetry Protocol) format. It is particularly useful in serverless environments like
5//! AWS Lambda where writing to stdout is a common pattern for exporting telemetry data.
6//!
7//! # Features
8//!
9//! - Uses OTLP Protobuf serialization for efficient encoding
10//! - Applies GZIP compression with configurable levels
11//! - Detects service name from environment variables
12//! - Supports custom headers via environment variables
13//! - Consistent JSON output format
14//!
15//! # Example
16//!
17//! ```rust,no_run
18//! use opentelemetry::global;
19//! use opentelemetry::trace::Tracer;
20//! use opentelemetry_sdk::{trace::SdkTracerProvider, Resource};
21//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
22//!
23//! #[tokio::main]
24//! async fn main() {
25//!     // Create a new stdout exporter
26//!     let exporter = OtlpStdoutSpanExporter::new();
27//!
28//!     // Create a new tracer provider with batch export
29//!     let provider = SdkTracerProvider::builder()
30//!         .with_batch_exporter(exporter)
31//!         .build();
32//!
33//!     // Register the provider with the OpenTelemetry global API
34//!     global::set_tracer_provider(provider.clone());
35//!
36//!     // Create a tracer
37//!     let tracer = global::tracer("my-service");
38//!
39//!     // Create spans
40//!     tracer.in_span("parent-operation", |_cx| {
41//!         println!("Doing work...");
42//!         
43//!         // Create nested spans
44//!         tracer.in_span("child-operation", |_cx| {
45//!             println!("Doing more work...");
46//!         });
47//!     });
48//!     
49//!     // Flush the provider to ensure all spans are exported
50//!     if let Err(err) = provider.force_flush() {
51//!         println!("Error flushing provider: {:?}", err);
52//!     }
53//! }
54//! ```
55//!
56//! # Environment Variables
57//!
58//! The exporter respects the following environment variables:
59//!
60//! - `OTEL_SERVICE_NAME`: Service name to use in output
61//! - `AWS_LAMBDA_FUNCTION_NAME`: Fallback service name (if `OTEL_SERVICE_NAME` not set)
62//! - `OTEL_EXPORTER_OTLP_HEADERS`: Global headers for OTLP export
63//! - `OTEL_EXPORTER_OTLP_TRACES_HEADERS`: Trace-specific headers (takes precedence if conflicting with `OTEL_EXPORTER_OTLP_HEADERS`)
64//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: GZIP compression level (0-9, default: 6)
65//!
66//! # Output Format
67//!
68//! The exporter writes each batch of spans as a JSON object to stdout:
69//!
70//! ```json
71//! {
72//!   "__otel_otlp_stdout": "0.1.0",
73//!   "source": "my-service",
74//!   "endpoint": "http://localhost:4318/v1/traces",
75//!   "method": "POST",
76//!   "content-type": "application/x-protobuf",
77//!   "content-encoding": "gzip",
78//!   "headers": {
79//!     "api-key": "secret123",
80//!     "custom-header": "value"
81//!   },
82//!   "payload": "<base64-encoded-gzipped-protobuf>",
83//!   "base64": true
84//! }
85//! ```
86
87use async_trait::async_trait;
88use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
89use flate2::{write::GzEncoder, Compression};
90use futures_util::future::BoxFuture;
91use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
92use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
93use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
94use opentelemetry_sdk::resource::Resource;
95use opentelemetry_sdk::{
96    error::OTelSdkError,
97    trace::{SpanData, SpanExporter},
98};
99use prost::Message;
100use serde::Serialize;
101#[cfg(test)]
102use std::sync::Mutex;
103use std::{
104    collections::HashMap,
105    env,
106    io::{self, Write},
107    result::Result,
108    sync::Arc,
109};
110
111const VERSION: &str = env!("CARGO_PKG_VERSION");
112const DEFAULT_ENDPOINT: &str = "http://localhost:4318/v1/traces";
113const DEFAULT_COMPRESSION_LEVEL: u8 = 6;
114const COMPRESSION_LEVEL_ENV_VAR: &str = "OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL";
115
116/// Trait for output handling
117///
118/// This trait defines the interface for writing output lines. It is implemented
119/// by both the standard output handler and test output handler.
120trait Output: Send + Sync + std::fmt::Debug {
121    /// Writes a single line of output
122    ///
123    /// # Arguments
124    ///
125    /// * `line` - The line to write
126    ///
127    /// # Returns
128    ///
129    /// Returns `Ok(())` if the write was successful, or a `TraceError` if it failed
130    fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
131}
132
133/// Standard output implementation that writes to stdout
134#[derive(Debug, Default)]
135struct StdOutput;
136
137impl Output for StdOutput {
138    fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
139        // Get a locked stdout handle once
140        let stdout = io::stdout();
141        let mut handle = stdout.lock();
142
143        // Write the line and a newline in one operation
144        writeln!(handle, "{}", line).map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
145
146        Ok(())
147    }
148}
149
150/// Test output implementation that captures to a buffer
151#[cfg(test)]
152#[derive(Debug, Default)]
153struct TestOutput {
154    buffer: Arc<Mutex<Vec<String>>>,
155}
156
157#[cfg(test)]
158impl TestOutput {
159    fn new() -> Self {
160        Self {
161            buffer: Arc::new(Mutex::new(Vec::new())),
162        }
163    }
164
165    fn get_output(&self) -> Vec<String> {
166        self.buffer.lock().unwrap().clone()
167    }
168}
169
170#[cfg(test)]
171impl Output for TestOutput {
172    fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
173        self.buffer.lock().unwrap().push(line.to_string());
174        Ok(())
175    }
176}
177
178/// Output format for the OTLP stdout exporter
179///
180/// This struct defines the JSON structure that will be written to stdout
181/// for each batch of spans.
182#[derive(Debug, Serialize)]
183struct ExporterOutput<'a> {
184    /// Version identifier for the output format
185    #[serde(rename = "__otel_otlp_stdout")]
186    version: &'a str,
187    /// Service name that generated the spans
188    source: String,
189    /// OTLP endpoint (always http://localhost:4318/v1/traces)
190    endpoint: &'a str,
191    /// HTTP method (always POST)
192    method: &'a str,
193    /// Content type (always application/x-protobuf)
194    #[serde(rename = "content-type")]
195    content_type: &'a str,
196    /// Content encoding (always gzip)
197    #[serde(rename = "content-encoding")]
198    content_encoding: &'a str,
199    /// Custom headers from environment variables
200    #[serde(skip_serializing_if = "HashMap::is_empty")]
201    headers: HashMap<String, String>,
202    /// Base64-encoded, gzipped, protobuf-serialized span data
203    payload: String,
204    /// Whether the payload is base64 encoded (always true)
205    base64: bool,
206}
207
208/// A span exporter that writes spans to stdout in OTLP format
209///
210/// This exporter implements the OpenTelemetry [`SpanExporter`] trait and writes spans
211/// to stdout in OTLP format with Protobuf serialization and GZIP compression.
212///
213/// # Features
214///
215/// - Configurable GZIP compression level (0-9)
216/// - Environment variable support for service name and headers
217/// - Efficient batching of spans
218/// - Base64 encoding of compressed data
219///
220/// # Example
221///
222/// ```rust,no_run
223/// use opentelemetry_sdk::runtime;
224/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
225///
226/// // Create an exporter with maximum compression
227/// let exporter = OtlpStdoutSpanExporter::with_gzip_level(9);
228/// ```
229#[derive(Debug)]
230pub struct OtlpStdoutSpanExporter {
231    /// GZIP compression level (0-9)
232    gzip_level: u8,
233    /// Optional resource to be included with all spans
234    resource: Option<Resource>,
235    /// Output implementation (stdout or test buffer)
236    output: Arc<dyn Output>,
237}
238
239impl Default for OtlpStdoutSpanExporter {
240    fn default() -> Self {
241        Self::new()
242    }
243}
244
245impl OtlpStdoutSpanExporter {
246    /// Creates a new exporter with default configuration
247    ///
248    /// The default GZIP compression level is determined by:
249    /// 1. The `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL` environment variable if set
250    /// 2. Otherwise, defaults to level 6
251    ///
252    /// # Example
253    ///
254    /// ```rust
255    /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
256    ///
257    /// let exporter = OtlpStdoutSpanExporter::new();
258    /// ```
259    pub fn new() -> Self {
260        let gzip_level =
261            Self::get_compression_level_from_env().unwrap_or(DEFAULT_COMPRESSION_LEVEL);
262        Self {
263            gzip_level,
264            resource: None,
265            output: Arc::new(StdOutput),
266        }
267    }
268
269    /// Creates a new exporter with custom GZIP compression level
270    ///
271    /// This method explicitly sets the compression level, overriding any value
272    /// set in the `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL` environment variable.
273    ///
274    /// # Arguments
275    ///
276    /// * `gzip_level` - GZIP compression level (0-9, where 0 is no compression and 9 is maximum compression)
277    ///
278    /// # Example
279    ///
280    /// ```rust
281    /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
282    ///
283    /// // Create an exporter with maximum compression
284    /// let exporter = OtlpStdoutSpanExporter::with_gzip_level(9);
285    /// ```
286    pub fn with_gzip_level(gzip_level: u8) -> Self {
287        Self {
288            gzip_level,
289            resource: None,
290            output: Arc::new(StdOutput),
291        }
292    }
293
294    /// Get the compression level from environment variable
295    ///
296    /// This function tries to read the compression level from the
297    /// `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL` environment variable.
298    /// It returns None if the variable is not set or cannot be parsed as a u8.
299    fn get_compression_level_from_env() -> Option<u8> {
300        env::var(COMPRESSION_LEVEL_ENV_VAR)
301            .ok()
302            .and_then(|val| val.parse::<u8>().ok())
303            .and_then(|level| if level <= 9 { Some(level) } else { None })
304    }
305
306    #[cfg(test)]
307    fn with_test_output() -> (Self, Arc<TestOutput>) {
308        let output = Arc::new(TestOutput::new());
309        let gzip_level =
310            Self::get_compression_level_from_env().unwrap_or(DEFAULT_COMPRESSION_LEVEL);
311        let exporter = Self {
312            gzip_level,
313            resource: None,
314            output: output.clone() as Arc<dyn Output>,
315        };
316        (exporter, output)
317    }
318
319    /// Parse headers from environment variables
320    ///
321    /// This function reads headers from both global and trace-specific
322    /// environment variables, with trace-specific headers taking precedence.
323    fn parse_headers() -> HashMap<String, String> {
324        let mut headers = HashMap::new();
325
326        // Parse global headers first
327        if let Ok(global_headers) = env::var("OTEL_EXPORTER_OTLP_HEADERS") {
328            Self::parse_header_string(&global_headers, &mut headers);
329        }
330
331        // Parse trace-specific headers (these take precedence)
332        if let Ok(trace_headers) = env::var("OTEL_EXPORTER_OTLP_TRACES_HEADERS") {
333            Self::parse_header_string(&trace_headers, &mut headers);
334        }
335
336        headers
337    }
338
339    /// Parse a header string in the format key1=value1,key2=value2
340    ///
341    /// # Arguments
342    ///
343    /// * `header_str` - The header string to parse
344    /// * `headers` - The map to store parsed headers in
345    fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
346        for pair in header_str.split(',') {
347            if let Some((key, value)) = pair.split_once('=') {
348                let key = key.trim().to_lowercase();
349                // Skip content-type and content-encoding as they are fixed
350                if key != "content-type" && key != "content-encoding" {
351                    headers.insert(key, value.trim().to_string());
352                }
353            }
354        }
355    }
356
357    /// Get the service name from environment variables
358    ///
359    /// This function tries to get the service name from:
360    /// 1. OTEL_SERVICE_NAME
361    /// 2. AWS_LAMBDA_FUNCTION_NAME
362    /// 3. Falls back to "unknown-service" if neither is set
363    fn get_service_name() -> String {
364        env::var("OTEL_SERVICE_NAME")
365            .or_else(|_| env::var("AWS_LAMBDA_FUNCTION_NAME"))
366            .unwrap_or_else(|_| "unknown-service".to_string())
367    }
368}
369
370#[async_trait]
371impl SpanExporter for OtlpStdoutSpanExporter {
372    /// Export spans to stdout in OTLP format
373    ///
374    /// This function:
375    /// 1. Converts spans to OTLP format
376    /// 2. Serializes them to protobuf
377    /// 3. Compresses the data with GZIP
378    /// 4. Base64 encodes the result
379    /// 5. Writes a JSON object to stdout
380    ///
381    /// # Arguments
382    ///
383    /// * `batch` - A vector of spans to export
384    ///
385    /// # Returns
386    ///
387    /// Returns a resolved future with `Ok(())` if the export was successful, or a `TraceError` if it failed
388    fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, Result<(), OTelSdkError>> {
389        // Do all work synchronously
390        let result = (|| {
391            // Convert spans to OTLP format
392            let resource = self
393                .resource
394                .clone()
395                .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
396            let resource_attrs = ResourceAttributesWithSchema::from(&resource);
397            let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
398            let request = ExportTraceServiceRequest { resource_spans };
399
400            // Serialize to protobuf
401            let proto_bytes = request.encode_to_vec();
402
403            // Compress with GZIP
404            let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.gzip_level as u32));
405            encoder
406                .write_all(&proto_bytes)
407                .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
408            let compressed_bytes = encoder
409                .finish()
410                .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
411
412            // Base64 encode
413            let payload = base64_engine.encode(compressed_bytes);
414
415            // Prepare the output
416            let output_data = ExporterOutput {
417                version: VERSION,
418                source: Self::get_service_name(),
419                endpoint: DEFAULT_ENDPOINT,
420                method: "POST",
421                content_type: "application/x-protobuf",
422                content_encoding: "gzip",
423                headers: Self::parse_headers(),
424                payload,
425                base64: true,
426            };
427
428            // Write using the output implementation
429            self.output.write_line(
430                &serde_json::to_string(&output_data)
431                    .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
432            )?;
433
434            Ok(())
435        })();
436
437        // Return a resolved future with the result
438        Box::pin(std::future::ready(result))
439    }
440
441    /// Shuts down the exporter
442    ///
443    /// This is a no-op for stdout export as no cleanup is needed.
444    ///
445    /// # Returns
446    ///
447    /// Returns `Ok(())` as there is nothing to clean up.
448    fn shutdown(&mut self) -> Result<(), OTelSdkError> {
449        Ok(())
450    }
451
452    /// Force flushes any pending spans
453    ///
454    /// This is a no-op for stdout export as spans are written immediately.
455    ///
456    /// # Returns
457    ///
458    /// Returns `Ok(())` as there is nothing to flush.
459    fn force_flush(&mut self) -> Result<(), OTelSdkError> {
460        Ok(())
461    }
462
463    /// Sets the resource for this exporter.
464    ///
465    /// This method stores a clone of the provided resource to be used when exporting spans.
466    /// The resource represents the entity producing telemetry and will be included in the
467    /// exported trace data.
468    ///
469    /// # Arguments
470    ///
471    /// * `resource` - The resource to associate with this exporter
472    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
473        self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
474            resource.clone(),
475        ));
476    }
477}
478
479#[cfg(doctest)]
480#[macro_use]
481extern crate doc_comment;
482
483#[cfg(doctest)]
484use doc_comment::doctest;
485
486#[cfg(doctest)]
487doctest!("../README.md", readme);
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492    use opentelemetry::{
493        trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
494        InstrumentationScope, KeyValue,
495    };
496    use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
497    use serde_json::Value;
498    use std::time::SystemTime;
499
500    fn create_test_span() -> SpanData {
501        let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
502        let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
503        let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
504
505        let span_context = SpanContext::new(
506            TraceId::from_bytes(trace_id_bytes),
507            SpanId::from_bytes(span_id_bytes),
508            TraceFlags::default(),
509            false,
510            TraceState::default(),
511        );
512
513        SpanData {
514            span_context,
515            parent_span_id: SpanId::from_bytes(parent_id_bytes),
516            span_kind: SpanKind::Client,
517            name: "test-span".into(),
518            start_time: SystemTime::UNIX_EPOCH,
519            end_time: SystemTime::UNIX_EPOCH,
520            attributes: vec![KeyValue::new("test.key", "test-value")],
521            dropped_attributes_count: 0,
522            events: SpanEvents::default(),
523            links: SpanLinks::default(),
524            status: Status::Ok,
525            instrumentation_scope: InstrumentationScope::builder("test-library")
526                .with_version("1.0.0")
527                .with_schema_url("https://opentelemetry.io/schema/1.0.0")
528                .build(),
529        }
530    }
531
532    #[test]
533    fn test_parse_headers() {
534        std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
535        std::env::set_var(
536            "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
537            "key2=override,key3=value3",
538        );
539
540        let headers = OtlpStdoutSpanExporter::parse_headers();
541
542        assert_eq!(headers.get("key1").unwrap(), "value1");
543        assert_eq!(headers.get("key2").unwrap(), "override");
544        assert_eq!(headers.get("key3").unwrap(), "value3");
545
546        // Clean up
547        std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
548        std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
549    }
550
551    #[test]
552    fn test_service_name_resolution() {
553        // Test OTEL_SERVICE_NAME priority
554        std::env::set_var("OTEL_SERVICE_NAME", "otel-service");
555        std::env::set_var("AWS_LAMBDA_FUNCTION_NAME", "lambda-function");
556        assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
557
558        // Test AWS_LAMBDA_FUNCTION_NAME fallback
559        std::env::remove_var("OTEL_SERVICE_NAME");
560        assert_eq!(
561            OtlpStdoutSpanExporter::get_service_name(),
562            "lambda-function"
563        );
564
565        // Test default value
566        std::env::remove_var("AWS_LAMBDA_FUNCTION_NAME");
567        assert_eq!(
568            OtlpStdoutSpanExporter::get_service_name(),
569            "unknown-service"
570        );
571    }
572
573    #[test]
574    fn test_compression_level_from_env() {
575        // Test with valid compression level
576        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
577        assert_eq!(
578            OtlpStdoutSpanExporter::get_compression_level_from_env(),
579            Some(3)
580        );
581
582        // Test with invalid compression level (>9)
583        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "10");
584        assert_eq!(
585            OtlpStdoutSpanExporter::get_compression_level_from_env(),
586            None
587        );
588
589        // Test with non-numeric value
590        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "invalid");
591        assert_eq!(
592            OtlpStdoutSpanExporter::get_compression_level_from_env(),
593            None
594        );
595
596        // Test with unset variable
597        std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
598        assert_eq!(
599            OtlpStdoutSpanExporter::get_compression_level_from_env(),
600            None
601        );
602    }
603
604    #[test]
605    fn test_new_uses_env_compression_level() {
606        // Set environment variable
607        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
608        let exporter = OtlpStdoutSpanExporter::new();
609        assert_eq!(exporter.gzip_level, 3);
610
611        // Test with unset variable (should use default)
612        std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
613        let exporter = OtlpStdoutSpanExporter::new();
614        assert_eq!(exporter.gzip_level, DEFAULT_COMPRESSION_LEVEL);
615    }
616
617    #[test]
618    fn test_with_gzip_level_overrides_env() {
619        // Set environment variable
620        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
621
622        // Explicit level should override environment
623        let exporter = OtlpStdoutSpanExporter::with_gzip_level(8);
624        assert_eq!(exporter.gzip_level, 8);
625
626        // Clean up
627        std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
628    }
629
630    #[tokio::test]
631    async fn test_compression_level_affects_output_size() {
632        // Create a large span batch to make compression differences more noticeable
633        let mut spans = Vec::new();
634        for i in 0..100 {
635            let mut span = create_test_span();
636            // Add unique attributes to each span to increase data size
637            span.attributes.push(KeyValue::new("index", i));
638            span.attributes.push(KeyValue::new("data", "a".repeat(100)));
639            spans.push(span);
640        }
641
642        // Test with no compression (level 0)
643        let (mut no_compression_exporter, no_compression_output) =
644            OtlpStdoutSpanExporter::with_test_output();
645        no_compression_exporter.gzip_level = 0;
646        let _ = no_compression_exporter.export(spans.clone()).await;
647        let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
648
649        // Test with medium compression (level 5)
650        let (mut medium_compression_exporter, medium_compression_output) =
651            OtlpStdoutSpanExporter::with_test_output();
652        medium_compression_exporter.gzip_level = 5;
653        let _ = medium_compression_exporter.export(spans.clone()).await;
654        let medium_compression_size =
655            extract_payload_size(&medium_compression_output.get_output()[0]);
656
657        // Test with maximum compression (level 9)
658        let (mut max_compression_exporter, max_compression_output) =
659            OtlpStdoutSpanExporter::with_test_output();
660        max_compression_exporter.gzip_level = 9;
661        let _ = max_compression_exporter.export(spans.clone()).await;
662        let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
663
664        // Verify that higher compression levels result in smaller payloads
665        assert!(no_compression_size > medium_compression_size,
666            "Medium compression (level 5) should produce smaller output than no compression (level 0). Got {} vs {}",
667            medium_compression_size, no_compression_size);
668
669        assert!(medium_compression_size >= max_compression_size,
670            "Maximum compression (level 9) should produce output no larger than medium compression (level 5). Got {} vs {}",
671            max_compression_size, medium_compression_size);
672
673        // Verify that all outputs can be properly decoded and contain the same data
674        let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
675        let medium_compression_spans =
676            decode_and_count_spans(&medium_compression_output.get_output()[0]);
677        let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
678
679        assert_eq!(
680            no_compression_spans,
681            spans.len(),
682            "No compression output should contain all spans"
683        );
684        assert_eq!(
685            medium_compression_spans,
686            spans.len(),
687            "Medium compression output should contain all spans"
688        );
689        assert_eq!(
690            max_compression_spans,
691            spans.len(),
692            "Maximum compression output should contain all spans"
693        );
694    }
695
696    // Helper function to extract the size of the base64-decoded payload
697    fn extract_payload_size(json_str: &str) -> usize {
698        let json: Value = serde_json::from_str(json_str).unwrap();
699        let payload = json["payload"].as_str().unwrap();
700        base64_engine.decode(payload).unwrap().len()
701    }
702
703    // Helper function to decode the payload and count the number of spans
704    fn decode_and_count_spans(json_str: &str) -> usize {
705        let json: Value = serde_json::from_str(json_str).unwrap();
706        let payload = json["payload"].as_str().unwrap();
707        let decoded = base64_engine.decode(payload).unwrap();
708
709        let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
710        let mut decompressed = Vec::new();
711        std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
712
713        let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
714
715        // Count total spans across all resource spans
716        let mut span_count = 0;
717        for resource_span in &request.resource_spans {
718            for scope_span in &resource_span.scope_spans {
719                span_count += scope_span.spans.len();
720            }
721        }
722
723        span_count
724    }
725
726    #[tokio::test]
727    async fn test_export_single_span() {
728        let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
729        let span = create_test_span();
730
731        let result = exporter.export(vec![span]).await;
732        assert!(result.is_ok());
733
734        let output = output.get_output();
735        assert_eq!(output.len(), 1);
736
737        // Parse and verify the output
738        let json: Value = serde_json::from_str(&output[0]).unwrap();
739        assert_eq!(json["__otel_otlp_stdout"], VERSION);
740        assert_eq!(json["method"], "POST");
741        assert_eq!(json["content-type"], "application/x-protobuf");
742        assert_eq!(json["content-encoding"], "gzip");
743        assert_eq!(json["base64"], true);
744
745        // Verify payload is valid base64 and can be decoded
746        let payload = json["payload"].as_str().unwrap();
747        let decoded = base64_engine.decode(payload).unwrap();
748
749        // Verify it can be decompressed
750        let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
751        let mut decompressed = Vec::new();
752        std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
753
754        // Verify it's valid OTLP protobuf
755        let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
756        assert_eq!(request.resource_spans.len(), 1);
757    }
758
759    #[tokio::test]
760    async fn test_export_empty_batch() {
761        let mut exporter = OtlpStdoutSpanExporter::new();
762        let result = exporter.export(vec![]).await;
763        assert!(result.is_ok());
764    }
765
766    #[test]
767    fn test_gzip_level_configuration() {
768        let exporter = OtlpStdoutSpanExporter::with_gzip_level(9);
769        assert_eq!(exporter.gzip_level, 9);
770    }
771
772    #[tokio::test]
773    async fn test_env_var_affects_export_compression() {
774        // Create test data
775        let span = create_test_span();
776        let spans = vec![span];
777
778        // Test with environment variable set to level 0 (no compression)
779        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "0");
780        let (mut env_exporter_0, env_output_0) = OtlpStdoutSpanExporter::with_test_output();
781        let _ = env_exporter_0.export(spans.clone()).await;
782        let env_size_0 = extract_payload_size(&env_output_0.get_output()[0]);
783
784        // Test with environment variable set to level 9 (max compression)
785        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "9");
786        let (mut env_exporter_9, env_output_9) = OtlpStdoutSpanExporter::with_test_output();
787        let _ = env_exporter_9.export(spans.clone()).await;
788        let env_size_9 = extract_payload_size(&env_output_9.get_output()[0]);
789
790        // Verify that the environment variable affected the compression level
791        assert!(env_size_0 > env_size_9,
792            "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
793            env_size_9, env_size_0);
794
795        // Test with invalid environment variable (should use default)
796        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "invalid");
797        let (mut env_exporter_invalid, _env_output_invalid) =
798            OtlpStdoutSpanExporter::with_test_output();
799        let _ = env_exporter_invalid.export(spans.clone()).await;
800
801        // Test with explicit level (should override environment variable)
802        std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "0");
803        let (mut explicit_exporter, explicit_output) = OtlpStdoutSpanExporter::with_test_output();
804        explicit_exporter.gzip_level = 9;
805        let _ = explicit_exporter.export(spans.clone()).await;
806        let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
807
808        // Verify that explicit level overrides environment variable
809        assert!(env_size_0 > explicit_size,
810            "Explicit level 9 should produce smaller output than environment variable level 0. Got {} vs {}",
811            explicit_size, env_size_0);
812
813        // Clean up
814        std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
815    }
816}