Skip to main content

otlp_stdout_span_exporter/
lib.rs

1//! A span exporter that writes OpenTelemetry spans to stdout in OTLP format.
2//!
3//! This crate provides an implementation of OpenTelemetry's [`SpanExporter`] that writes spans to stdout
4//! in OTLP (OpenTelemetry Protocol) format. It is particularly useful in serverless environments like
5//! AWS Lambda where writing to stdout is a common pattern for exporting telemetry data.
6//!
7//! # Features
8//!
9//! - Uses OTLP Protobuf serialization for efficient encoding
10//! - Applies GZIP compression with configurable levels
11//! - Detects service name from environment variables
12//! - Supports custom headers via environment variables
13//! - Supports writing to stdout or named pipe
14//! - Consistent JSON output format
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use opentelemetry::global;
20//! use opentelemetry::trace::Tracer;
21//! use opentelemetry_sdk::{trace::SdkTracerProvider, Resource};
22//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
23//!
24//! #[tokio::main]
25//! async fn main() {
26//!     // Create a new stdout exporter with default configuration (stdout output)
27//!     let exporter = OtlpStdoutSpanExporter::default();
28//!
29//!     // Or create one that writes to a named pipe
30//!     let pipe_exporter = OtlpStdoutSpanExporter::builder()
31//!         .pipe(true)  // Will write to /tmp/otlp-stdout-span-exporter.pipe
32//!         .build();
33//!
34//!     // Create a new tracer provider with batch export
35//!     let provider = SdkTracerProvider::builder()
36//!         .with_batch_exporter(exporter)
37//!         .build();
38//!
39//!     // Register the provider with the OpenTelemetry global API
40//!     global::set_tracer_provider(provider.clone());
41//!
42//!     // Create a tracer
43//!     let tracer = global::tracer("my-service");
44//!
45//!     // Create spans
46//!     tracer.in_span("parent-operation", |_cx| {
47//!         println!("Doing work...");
48//!         
49//!         // Create nested spans
50//!         tracer.in_span("child-operation", |_cx| {
51//!             println!("Doing more work...");
52//!         });
53//!     });
54//!     
55//!     // Flush the provider to ensure all spans are exported
56//!     if let Err(err) = provider.force_flush() {
57//!         println!("Error flushing provider: {:?}", err);
58//!     }
59//! }
60//! ```
61//!
62//! # Environment Variables
63//!
64//! The exporter respects the following environment variables:
65//!
66//! - `OTEL_SERVICE_NAME`: Service name to use in output
67//! - `AWS_LAMBDA_FUNCTION_NAME`: Fallback service name (if `OTEL_SERVICE_NAME` not set)
68//! - `OTEL_EXPORTER_OTLP_HEADERS`: Global headers for OTLP export
69//! - `OTEL_EXPORTER_OTLP_TRACES_HEADERS`: Trace-specific headers (takes precedence if conflicting with `OTEL_EXPORTER_OTLP_HEADERS`)
70//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: GZIP compression level (0-9, default: 6)
71//! - `OTLP_STDOUT_SPAN_EXPORTER_OUTPUT_TYPE`: Output type ("pipe" or "stdout", default: "stdout")
72//!
73//! # Configuration Precedence
74//!
75//! All configuration values follow this strict precedence order:
76//!
77//! 1. Environment variables (highest precedence)
78//! 2. Constructor parameters
79//! 3. Default values (lowest precedence)
80//!
81//! For example, when determining the output type:
82//!
83//! ```rust
84//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
85//!
86//! // This will use OTLP_STDOUT_SPAN_EXPORTER_OUTPUT_TYPE if set,
87//! // otherwise it will write to a named pipe as specified in the constructor
88//! let pipe_exporter = OtlpStdoutSpanExporter::builder()
89//!     .pipe(true)
90//!     .build();
91//!
92//! // This will use the environment variable if set, or default to stdout
93//! let default_exporter = OtlpStdoutSpanExporter::default();
94//! ```
95//!
96//! # Output Format
97//!
98//! The exporter writes each batch of spans as a JSON object to stdout or the named pipe:
99//!
100//! ```json
101//! {
102//!   "__otel_otlp_stdout": "0.1.0",
103//!   "source": "my-service",
104//!   "endpoint": "http://localhost:4318/v1/traces",
105//!   "method": "POST",
106//!   "content-type": "application/x-protobuf",
107//!   "content-encoding": "gzip",
108//!   "headers": {
109//!     "api-key": "secret123",
110//!     "custom-header": "value"
111//!   },
112//!   "payload": "<base64-encoded-gzipped-protobuf>",
113//!   "base64": true
114//! }
115//! ```
116
117use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
118use bon::bon;
119use flate2::{write::GzEncoder, Compression};
120use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
121use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
122use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
123use opentelemetry_sdk::error::OTelSdkResult;
124use opentelemetry_sdk::resource::Resource;
125use opentelemetry_sdk::{
126    error::OTelSdkError,
127    trace::{SpanData, SpanExporter},
128};
129use prost::Message;
130use serde::{Deserialize, Serialize};
131use std::{
132    collections::HashMap,
133    env,
134    fmt::{self, Display},
135    fs::OpenOptions,
136    io::{self, Write},
137    path::PathBuf,
138    result::Result,
139    str::FromStr,
140    sync::{Arc, Mutex},
141};
142
143mod constants;
144use constants::{defaults, env_vars};
145
146// Make the constants module and its sub-modules publicly available
147pub mod consts {
148    //! Constants used by the exporter.
149    //!
150    //! This module provides constants for environment variables,
151    //! default values, and resource attributes.
152
153    pub use crate::constants::defaults;
154    pub use crate::constants::env_vars;
155    pub use crate::constants::resource_attributes;
156}
157
158const VERSION: &str = env!("CARGO_PKG_VERSION");
159
160/// Log level for the exported spans
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
162pub enum LogLevel {
163    /// Debug level
164    Debug,
165    /// Info level (default)
166    #[default]
167    Info,
168    /// Warning level
169    Warn,
170    /// Error level (least verbose)
171    Error,
172}
173
174impl FromStr for LogLevel {
175    type Err = String;
176
177    fn from_str(s: &str) -> Result<Self, Self::Err> {
178        match s.to_lowercase().as_str() {
179            "debug" => Ok(LogLevel::Debug),
180            "info" => Ok(LogLevel::Info),
181            "warn" | "warning" => Ok(LogLevel::Warn),
182            "error" => Ok(LogLevel::Error),
183            _ => Err(format!("Invalid log level: {s}")),
184        }
185    }
186}
187
188impl Display for LogLevel {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        match self {
191            LogLevel::Debug => write!(f, "DEBUG"),
192            LogLevel::Info => write!(f, "INFO"),
193            LogLevel::Warn => write!(f, "WARN"),
194            LogLevel::Error => write!(f, "ERROR"),
195        }
196    }
197}
198
199/// Trait for output handling
200///
201/// This trait defines the interface for writing output lines. It is implemented
202/// by both the standard output handler and named pipe output handlers.
203pub trait Output: Send + Sync + std::fmt::Debug + std::any::Any {
204    /// Writes a single line of output
205    ///
206    /// # Arguments
207    ///
208    /// * `line` - The line to write
209    ///
210    /// # Returns
211    ///
212    /// Returns `Ok(())` if the write was successful, or a `TraceError` if it failed
213    fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
214
215    /// Checks if the output target is a named pipe.
216    ///
217    /// Returns `true` if the output is configured to write to a named pipe,
218    /// `false` otherwise (e.g., stdout).
219    fn is_pipe(&self) -> bool;
220
221    /// Performs a "touch" operation on the output target, primarily for named pipes.
222    ///
223    /// For named pipes, this should open and immediately close the pipe for writing
224    /// to generate an EOF signal for readers.
225    /// For other output types (like stdout), this is typically a no-op.
226    ///
227    /// # Returns
228    ///
229    /// Returns `Ok(())` if the touch was successful or is a no-op,
230    /// or a `TraceError` if opening/closing the pipe failed.
231    fn touch_pipe(&self) -> Result<(), OTelSdkError> {
232        // Default implementation is a no-op
233        Ok(())
234    }
235}
236
237/// Standard output implementation that writes to stdout
238#[derive(Debug, Default)]
239struct StdOutput;
240
241impl Output for StdOutput {
242    fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
243        // Get a locked stdout handle once
244        let stdout = io::stdout();
245        let mut handle = stdout.lock();
246
247        // Write the line and a newline in one operation
248        writeln!(handle, "{line}").map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
249
250        Ok(())
251    }
252
253    fn is_pipe(&self) -> bool {
254        false // Stdout is not a pipe
255    }
256}
257
258/// Output implementation that writes to a named pipe
259#[derive(Debug)]
260struct NamedPipeOutput {
261    path: PathBuf,
262}
263
264impl NamedPipeOutput {
265    fn new() -> Result<Self, OTelSdkError> {
266        let path_buf = PathBuf::from(defaults::PIPE_PATH);
267        if !path_buf.exists() {
268            log::warn!("Named pipe does not exist: {}", defaults::PIPE_PATH);
269            // On Unix systems we could create it with mkfifo but this would need cfg platform specifics
270        }
271
272        Ok(Self { path: path_buf })
273    }
274}
275
276impl Output for NamedPipeOutput {
277    fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
278        // Open the pipe for writing
279        let mut file = OpenOptions::new()
280            .write(true)
281            .open(&self.path)
282            .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to open pipe: {e}")))?;
283
284        // Write line with newline
285        writeln!(file, "{line}")
286            .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to write to pipe: {e}")))?;
287
288        Ok(())
289    }
290
291    fn is_pipe(&self) -> bool {
292        true // This implementation is for named pipes
293    }
294
295    fn touch_pipe(&self) -> Result<(), OTelSdkError> {
296        // Open the pipe for writing and immediately close it (RAII handles close)
297        let _file = OpenOptions::new()
298            .write(true)
299            .open(&self.path)
300            .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to touch pipe: {e}")))?;
301        Ok(())
302    }
303}
304
305/// An Output implementation that writes lines to an internal buffer.
306#[derive(Clone, Default)]
307pub struct BufferOutput {
308    buffer: Arc<Mutex<Vec<String>>>,
309}
310
311impl BufferOutput {
312    /// Creates a new, empty BufferOutput.
313    pub fn new() -> Self {
314        Self::default()
315    }
316
317    /// Retrieves all lines currently in the buffer, clearing the buffer afterwards.
318    pub fn take_lines(&self) -> Result<Vec<String>, OTelSdkError> {
319        let mut guard = self.buffer.lock().map_err(|e| {
320            OTelSdkError::InternalFailure(format!(
321                "Failed to lock buffer mutex for take_lines: {e}"
322            ))
323        })?;
324        Ok(std::mem::take(&mut *guard)) // Efficiently swaps the Vec with an empty one and wraps in Ok
325    }
326}
327
328impl Output for BufferOutput {
329    fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
330        let mut guard = self.buffer.lock().map_err(|e| {
331            OTelSdkError::InternalFailure(format!(
332                "Failed to lock buffer mutex for write_line: {e}"
333            ))
334        })?;
335        guard.push(line.to_string());
336        Ok(())
337    }
338
339    fn is_pipe(&self) -> bool {
340        false // BufferOutput is not a pipe
341    }
342}
343
344// Need to implement Debug manually as Mutex doesn't implement Debug
345impl fmt::Debug for BufferOutput {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        // Avoid locking the mutex in debug formatting if possible, or provide minimal info
348        f.debug_struct("BufferOutput").finish_non_exhaustive()
349    }
350}
351
352/// Helper function to create output based on type
353fn create_output(use_pipe: bool) -> Arc<dyn Output> {
354    if use_pipe {
355        match NamedPipeOutput::new() {
356            Ok(output) => Arc::new(output),
357            Err(e) => {
358                log::warn!("Failed to create named pipe output: {e}, falling back to stdout");
359                Arc::new(StdOutput)
360            }
361        }
362    } else {
363        Arc::new(StdOutput)
364    }
365}
366
367/// Output format for the OTLP stdout exporter
368///
369/// This struct defines the JSON structure that will be written to stdout
370/// for each batch of spans.
371#[derive(Debug, Serialize, Deserialize)]
372pub struct ExporterOutput {
373    /// Version identifier for the output format
374    #[serde(rename = "__otel_otlp_stdout")]
375    pub version: String,
376    /// Service name that generated the spans
377    pub source: String,
378    /// OTLP endpoint (always http://localhost:4318/v1/traces)
379    pub endpoint: String,
380    /// HTTP method (always POST)
381    pub method: String,
382    /// Content type (always application/x-protobuf)
383    #[serde(rename = "content-type")]
384    pub content_type: String,
385    /// Content encoding (always gzip)
386    #[serde(rename = "content-encoding")]
387    pub content_encoding: String,
388    /// Custom headers from environment variables
389    #[serde(skip_serializing_if = "ExporterOutput::is_headers_empty")]
390    pub headers: Option<HashMap<String, String>>,
391    /// Base64-encoded, gzipped, protobuf-serialized span data
392    pub payload: String,
393    /// Whether the payload is base64 encoded (always true)
394    pub base64: bool,
395    /// Log level for filtering (optional)
396    #[serde(skip_serializing_if = "Option::is_none")]
397    pub level: Option<String>,
398}
399
400impl ExporterOutput {
401    /// Helper function for serde to skip serializing empty headers
402    fn is_headers_empty(headers: &Option<HashMap<String, String>>) -> bool {
403        headers.as_ref().is_none_or(|h| h.is_empty())
404    }
405}
406
407/// A span exporter that writes spans to stdout in OTLP format
408///
409/// This exporter implements the OpenTelemetry [`SpanExporter`] trait and writes spans
410/// to stdout in OTLP format with Protobuf serialization and GZIP compression.
411///
412/// # Features
413///
414/// - Configurable GZIP compression level (0-9)
415/// - Environment variable support for service name and headers
416/// - Efficient batching of spans
417/// - Base64 encoding of compressed data
418///
419/// # Example
420///
421/// ```rust,no_run
422/// use opentelemetry_sdk::runtime;
423/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
424///
425/// // Create an exporter with maximum compression
426/// let exporter = OtlpStdoutSpanExporter::builder()
427///     .compression_level(9)
428///     .build();
429/// ```
430#[derive(Debug)]
431pub struct OtlpStdoutSpanExporter {
432    /// GZIP compression level (0-9)
433    compression_level: u8,
434    /// Optional resource to be included with all spans
435    resource: Option<Resource>,
436    // Optional headers
437    headers: Option<HashMap<String, String>>,
438    /// Output implementation (stdout or named pipe)
439    output: Arc<dyn Output>,
440    /// Optional log level for the exported spans
441    level: Option<LogLevel>,
442}
443
444impl Default for OtlpStdoutSpanExporter {
445    fn default() -> Self {
446        Self::builder().build()
447    }
448}
449#[bon]
450impl OtlpStdoutSpanExporter {
451    /// Create a new `OtlpStdoutSpanExporter` with default configuration.
452    ///
453    /// This uses a GZIP compression level of 6 unless overridden by an environment variable.
454    ///
455    /// # Output Type
456    ///
457    /// The output type is determined in the following order:
458    ///
459    /// 1. The `OTLP_STDOUT_SPAN_EXPORTER_OUTPUT_TYPE` environment variable if set ("pipe" or "stdout")
460    /// 2. Constructor parameter (pipe)
461    /// 3. Default (stdout)
462    ///
463    /// # Example
464    ///
465    /// ```
466    /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
467    ///
468    /// let exporter = OtlpStdoutSpanExporter::default();
469    /// ```
470    #[builder]
471    pub fn new(
472        compression_level: Option<u8>,
473        resource: Option<Resource>,
474        headers: Option<HashMap<String, String>>,
475        output: Option<Arc<dyn Output>>,
476        level: Option<LogLevel>,
477        pipe: Option<bool>,
478    ) -> Self {
479        // Set gzip_level with proper precedence (env var > constructor param > default)
480        let compression_level = match env::var(env_vars::COMPRESSION_LEVEL) {
481            Ok(value) => match value.parse::<u8>() {
482                Ok(level) if level <= 9 => level,
483                Ok(level) => {
484                    log::warn!(
485                        "Invalid value in {}: {} (must be 0-9), using fallback",
486                        env_vars::COMPRESSION_LEVEL,
487                        level
488                    );
489                    compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
490                }
491                Err(_) => {
492                    log::warn!(
493                        "Failed to parse {}: {}, using fallback",
494                        env_vars::COMPRESSION_LEVEL,
495                        value
496                    );
497                    compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
498                }
499            },
500            Err(_) => {
501                // No environment variable, use parameter or default
502                compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
503            }
504        };
505
506        // Combine constructor headers with environment headers, giving priority to env vars
507        let headers = match headers {
508            Some(constructor_headers) => {
509                if let Some(env_headers) = Self::parse_headers() {
510                    // Merge, with env headers taking precedence
511                    let mut merged = constructor_headers;
512                    merged.extend(env_headers);
513                    Some(merged)
514                } else {
515                    // No env headers, use constructor headers
516                    Some(constructor_headers)
517                }
518            }
519            None => Self::parse_headers(), // Use env headers only
520        };
521
522        // Set log level with proper precedence (env var > constructor param > default)
523        let level = match env::var(env_vars::LOG_LEVEL) {
524            Ok(value) => match LogLevel::from_str(&value) {
525                Ok(log_level) => Some(log_level),
526                Err(e) => {
527                    log::warn!(
528                        "Invalid log level in {}: {}, using fallback",
529                        env_vars::LOG_LEVEL,
530                        e
531                    );
532                    level
533                }
534            },
535            Err(_) => {
536                // No environment variable, use parameter
537                level
538            }
539        };
540
541        // Determine output type with proper precedence (env var > constructor > default)
542        let use_pipe = match env::var(env_vars::OUTPUT_TYPE) {
543            Ok(value) => value.to_lowercase() == "pipe",
544            Err(_) => pipe.unwrap_or(false),
545        };
546
547        // Create output implementation
548        let output = output.unwrap_or_else(|| create_output(use_pipe));
549
550        Self {
551            compression_level,
552            resource,
553            headers,
554            output,
555            level,
556        }
557    }
558
559    /// Get the service name from environment variables.
560    ///
561    /// The service name is determined in the following order:
562    ///
563    /// 1. OTEL_SERVICE_NAME
564    /// 2. AWS_LAMBDA_FUNCTION_NAME
565    /// 3. "unknown-service" (fallback)
566    fn get_service_name() -> String {
567        env::var(env_vars::SERVICE_NAME)
568            .or_else(|_| env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME))
569            .unwrap_or_else(|_| defaults::SERVICE_NAME.to_string())
570    }
571
572    #[cfg(test)]
573    fn with_test_output() -> (Self, Arc<TestOutput>) {
574        let output = Arc::new(TestOutput::new());
575
576        // Use the standard builder() method and explicitly set the output
577        let exporter = Self::builder().output(output.clone()).build();
578
579        (exporter, output)
580    }
581
582    /// Parse headers from environment variables
583    ///
584    /// This function reads headers from both global and trace-specific
585    /// environment variables, with trace-specific headers taking precedence.
586    fn parse_headers() -> Option<HashMap<String, String>> {
587        // Function to get and parse headers from an env var
588        let get_headers = |var_name: &str| -> Option<HashMap<String, String>> {
589            env::var(var_name).ok().map(|header_str| {
590                let mut map = HashMap::new();
591                Self::parse_header_string(&header_str, &mut map);
592                map
593            })
594        };
595
596        // Try to get headers from both env vars
597        let global_headers = get_headers("OTEL_EXPORTER_OTLP_HEADERS");
598        let trace_headers = get_headers("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
599
600        // If no headers were found in either env var, return None
601        if global_headers.is_none() && trace_headers.is_none() {
602            return None;
603        }
604
605        // Create a merged map, with trace headers taking precedence
606        let mut result = HashMap::new();
607
608        // Add global headers first (if any)
609        if let Some(headers) = global_headers {
610            result.extend(headers);
611        }
612
613        // Add trace-specific headers (if any) - these will override any duplicates
614        if let Some(headers) = trace_headers {
615            result.extend(headers);
616        }
617
618        // Return None for empty map, otherwise Some
619        if result.is_empty() {
620            None
621        } else {
622            Some(result)
623        }
624    }
625
626    /// Parse a header string in the format key1=value1,key2=value2
627    ///
628    /// # Arguments
629    ///
630    /// * `header_str` - The header string to parse
631    /// * `headers` - The map to store parsed headers in
632    fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
633        for pair in header_str.split(',') {
634            if let Some((key, value)) = pair.split_once('=') {
635                let key = key.trim().to_lowercase();
636                // Skip content-type and content-encoding as they are fixed
637                if key != "content-type" && key != "content-encoding" {
638                    headers.insert(key, value.trim().to_string());
639                }
640            }
641        }
642    }
643}
644
645impl SpanExporter for OtlpStdoutSpanExporter {
646    /// Export spans to stdout in OTLP format
647    ///
648    /// This function:
649    /// 1. Converts spans to OTLP format
650    /// 2. Serializes them to protobuf
651    /// 3. Compresses the data with GZIP
652    /// 4. Base64 encodes the result
653    /// 5. Writes a JSON object to stdout
654    ///
655    /// # Arguments
656    ///
657    /// * `batch` - A vector of spans to export
658    ///
659    /// # Returns
660    ///
661    /// Returns a resolved future with `Ok(())` if the export was successful, or a `TraceError` if it failed
662    fn export(
663        &self,
664        batch: Vec<SpanData>,
665    ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
666        // Check for empty batch and pipe output configuration
667        if batch.is_empty() && self.output.is_pipe() {
668            // Perform the "pipe touch" operation: open for writing and immediately close.
669            let touch_result = self.output.touch_pipe();
670            return Box::pin(std::future::ready(touch_result));
671        }
672
673        // Original export logic for non-empty batches or stdout output
674        let result = (|| {
675            // Convert spans to OTLP format
676            let resource = self
677                .resource
678                .clone()
679                .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
680            let resource_attrs = ResourceAttributesWithSchema::from(&resource);
681            let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
682            let request = ExportTraceServiceRequest { resource_spans };
683
684            // Serialize to protobuf
685            let proto_bytes = request.encode_to_vec();
686
687            // Compress with GZIP
688            let mut encoder =
689                GzEncoder::new(Vec::new(), Compression::new(self.compression_level as u32));
690            encoder
691                .write_all(&proto_bytes)
692                .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
693            let compressed_bytes = encoder
694                .finish()
695                .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
696
697            // Base64 encode
698            let payload = base64_engine.encode(compressed_bytes);
699
700            // Prepare the output
701            let output_data = ExporterOutput {
702                version: VERSION.to_string(),
703                source: Self::get_service_name(),
704                endpoint: defaults::ENDPOINT.to_string(),
705                method: "POST".to_string(),
706                content_type: "application/x-protobuf".to_string(),
707                content_encoding: "gzip".to_string(),
708                headers: self.headers.clone(),
709                payload,
710                base64: true,
711                level: self.level.map(|l| l.to_string()),
712            };
713
714            // Write using the output implementation
715            self.output.write_line(
716                &serde_json::to_string(&output_data)
717                    .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
718            )?;
719
720            Ok(())
721        })();
722
723        // Return a resolved future with the result
724        Box::pin(std::future::ready(result))
725    }
726
727    /// Shuts down the exporter with a timeout
728    ///
729    /// This is a no-op for stdout export as no cleanup is needed.
730    /// The timeout parameter is ignored since there's nothing to flush.
731    ///
732    /// # Returns
733    ///
734    /// Returns `Ok(())` as there is nothing to clean up.
735    fn shutdown_with_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), OTelSdkError> {
736        Ok(())
737    }
738
739    /// Force flushes any pending spans
740    ///
741    /// This is a no-op for stdout export as spans are written immediately.
742    ///
743    /// # Returns
744    ///
745    /// Returns `Ok(())` as there is nothing to flush.
746    fn force_flush(&mut self) -> Result<(), OTelSdkError> {
747        Ok(())
748    }
749
750    /// Sets the resource for this exporter.
751    ///
752    /// This method stores a clone of the provided resource to be used when exporting spans.
753    /// The resource represents the entity producing telemetry and will be included in the
754    /// exported trace data.
755    ///
756    /// # Arguments
757    ///
758    /// * `resource` - The resource to associate with this exporter
759    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
760        self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
761            resource.clone(),
762        ));
763    }
764}
765
766#[cfg(doctest)]
767#[macro_use]
768extern crate doc_comment;
769
770#[cfg(doctest)]
771use doc_comment::doctest;
772
773#[cfg(doctest)]
774doctest!("../README.md", readme);
775
776/// Test output implementation that captures to a buffer
777#[cfg(test)]
778#[derive(Debug, Default)]
779struct TestOutput {
780    buffer: Arc<Mutex<Vec<String>>>,
781}
782
783#[cfg(test)]
784impl TestOutput {
785    fn new() -> Self {
786        Self {
787            buffer: Arc::new(Mutex::new(Vec::new())),
788        }
789    }
790
791    fn get_output(&self) -> Vec<String> {
792        self.buffer.lock().unwrap().clone()
793    }
794}
795
796#[cfg(test)]
797impl Output for TestOutput {
798    fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
799        self.buffer.lock().unwrap().push(line.to_string());
800        Ok(())
801    }
802
803    fn is_pipe(&self) -> bool {
804        false // TestOutput is not a pipe
805    }
806
807    // touch_pipe uses the default no-op implementation
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813    #[cfg(unix)]
814    use nix::{sys::stat::Mode, unistd::mkfifo};
815    use opentelemetry::{
816        trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
817        InstrumentationScope, KeyValue,
818    };
819    use opentelemetry_proto::tonic::{
820        common::v1::any_value::Value as AnyValue, trace::v1::SpanFlags,
821    };
822    use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
823    use serde_json::Value;
824    use serial_test::serial;
825    use std::{
826        fs::OpenOptions,
827        io::Read,
828        path::PathBuf,
829        sync::Arc,
830        thread,
831        time::{Duration, SystemTime, UNIX_EPOCH},
832    };
833
834    #[derive(Debug)]
835    struct FailingOutput;
836
837    impl Output for FailingOutput {
838        fn write_line(&self, _line: &str) -> Result<(), OTelSdkError> {
839            Err(OTelSdkError::InternalFailure(
840                "intentional test sink failure".to_string(),
841            ))
842        }
843
844        fn is_pipe(&self) -> bool {
845            false
846        }
847    }
848
849    fn create_test_span() -> SpanData {
850        let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
851        let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
852        let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
853
854        let span_context = SpanContext::new(
855            TraceId::from_bytes(trace_id_bytes),
856            SpanId::from_bytes(span_id_bytes),
857            TraceFlags::default(),
858            false,
859            TraceState::default(),
860        );
861
862        SpanData {
863            span_context,
864            parent_span_id: SpanId::from_bytes(parent_id_bytes),
865            parent_span_is_remote: false,
866            span_kind: SpanKind::Client,
867            name: "test-span".into(),
868            start_time: SystemTime::UNIX_EPOCH,
869            end_time: SystemTime::UNIX_EPOCH,
870            attributes: vec![KeyValue::new("test.key", "test-value")],
871            dropped_attributes_count: 0,
872            events: SpanEvents::default(),
873            links: SpanLinks::default(),
874            status: Status::Ok,
875            instrumentation_scope: InstrumentationScope::builder("test-library")
876                .with_version("1.0.0")
877                .with_schema_url("https://opentelemetry.io/schema/1.0.0")
878                .build(),
879        }
880    }
881
882    fn decode_export_request(json_str: &str) -> ExportTraceServiceRequest {
883        let json: Value = serde_json::from_str(json_str).unwrap();
884        let payload = json["payload"].as_str().unwrap();
885        let decoded = base64_engine.decode(payload).unwrap();
886
887        let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
888        let mut decompressed = Vec::new();
889        decoder.read_to_end(&mut decompressed).unwrap();
890
891        ExportTraceServiceRequest::decode(&*decompressed).unwrap()
892    }
893
894    fn unique_test_pipe_path(name: &str) -> PathBuf {
895        let now = SystemTime::now()
896            .duration_since(UNIX_EPOCH)
897            .unwrap()
898            .as_nanos();
899        std::env::temp_dir().join(format!(
900            "otlp-stdout-span-exporter-{name}-{}-{now}.fifo",
901            std::process::id()
902        ))
903    }
904
905    #[cfg(unix)]
906    fn create_test_fifo(name: &str) -> PathBuf {
907        let path = unique_test_pipe_path(name);
908        let _ = std::fs::remove_file(&path);
909        mkfifo(&path, Mode::from_bits_truncate(0o600)).unwrap();
910        path
911    }
912
913    #[test]
914    fn test_parse_headers() {
915        std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
916        std::env::set_var(
917            "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
918            "key2=override,key3=value3",
919        );
920
921        let headers = OtlpStdoutSpanExporter::parse_headers();
922
923        // Headers should be Some since we set environment variables
924        assert!(headers.is_some());
925        let headers = headers.unwrap();
926
927        assert_eq!(headers.get("key1").unwrap(), "value1");
928        assert_eq!(headers.get("key2").unwrap(), "override");
929        assert_eq!(headers.get("key3").unwrap(), "value3");
930
931        // Clean up
932        std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
933        std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
934    }
935
936    #[test]
937    fn test_service_name_resolution() {
938        // Test OTEL_SERVICE_NAME priority
939        std::env::set_var(env_vars::SERVICE_NAME, "otel-service");
940        std::env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "lambda-function");
941        assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
942
943        // Test AWS_LAMBDA_FUNCTION_NAME fallback
944        std::env::remove_var(env_vars::SERVICE_NAME);
945        assert_eq!(
946            OtlpStdoutSpanExporter::get_service_name(),
947            "lambda-function"
948        );
949
950        // Test default fallback
951        std::env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
952        assert_eq!(
953            OtlpStdoutSpanExporter::get_service_name(),
954            defaults::SERVICE_NAME
955        );
956    }
957
958    #[test]
959    fn test_compression_level_precedence() {
960        // Test env var takes precedence over options
961        std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
962        let exporter = OtlpStdoutSpanExporter::builder()
963            .compression_level(7)
964            .build();
965        assert_eq!(exporter.compression_level, 3);
966
967        // Test invalid env var falls back to options
968        std::env::set_var(env_vars::COMPRESSION_LEVEL, "invalid");
969        let exporter = OtlpStdoutSpanExporter::builder()
970            .compression_level(7)
971            .build();
972        assert_eq!(exporter.compression_level, 7);
973
974        // Test no env var uses options
975        std::env::remove_var(env_vars::COMPRESSION_LEVEL);
976        let exporter = OtlpStdoutSpanExporter::builder()
977            .compression_level(7)
978            .build();
979        assert_eq!(exporter.compression_level, 7);
980
981        // Test fallback to default
982        let exporter = OtlpStdoutSpanExporter::builder()
983            .compression_level(defaults::COMPRESSION_LEVEL)
984            .build();
985        assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
986    }
987
988    #[test]
989    fn test_new_uses_env_compression_level() {
990        // Set environment variable
991        std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
992        let exporter = OtlpStdoutSpanExporter::default();
993        assert_eq!(exporter.compression_level, 3);
994
995        // Test with unset variable (should use default)
996        std::env::remove_var(env_vars::COMPRESSION_LEVEL);
997        let exporter = OtlpStdoutSpanExporter::default();
998        assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
999    }
1000
1001    #[tokio::test]
1002    #[serial]
1003    async fn test_compression_level_affects_output_size() {
1004        // Create a large span batch to make compression differences more noticeable
1005        let mut spans = Vec::new();
1006        for i in 0..100 {
1007            let mut span = create_test_span();
1008            // Add unique attributes to each span to increase data size
1009            span.attributes.push(KeyValue::new("index", i));
1010            // Add a large attribute to make compression more effective
1011            span.attributes
1012                .push(KeyValue::new("data", "a".repeat(1000)));
1013            spans.push(span);
1014        }
1015
1016        // Make sure environment variables don't interfere with our test
1017        std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1018
1019        // Create exporter with no compression (level 0)
1020        let no_compression_output = Arc::new(TestOutput::new());
1021        let no_compression_exporter = OtlpStdoutSpanExporter {
1022            compression_level: 0,
1023            resource: None,
1024            output: no_compression_output.clone() as Arc<dyn Output>,
1025            headers: None,
1026            level: None,
1027        };
1028        let _ = no_compression_exporter.export(spans.clone()).await;
1029        let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
1030
1031        // Create exporter with max compression (level 9)
1032        let max_compression_output = Arc::new(TestOutput::new());
1033        let max_compression_exporter = OtlpStdoutSpanExporter {
1034            compression_level: 9,
1035            resource: None,
1036            output: max_compression_output.clone() as Arc<dyn Output>,
1037            headers: None,
1038            level: None,
1039        };
1040        let _ = max_compression_exporter.export(spans.clone()).await;
1041        let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
1042
1043        // Verify that higher compression levels result in smaller payloads
1044        assert!(no_compression_size > max_compression_size,
1045            "Maximum compression (level 9) should produce output no larger than no compression (level 0). Got {} vs {}",
1046            max_compression_size, no_compression_size);
1047
1048        // Verify that all outputs can be properly decoded and contain the same data
1049        let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
1050        let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
1051
1052        assert_eq!(
1053            no_compression_spans,
1054            spans.len(),
1055            "No compression output should contain all spans"
1056        );
1057        assert_eq!(
1058            max_compression_spans,
1059            spans.len(),
1060            "Maximum compression output should contain all spans"
1061        );
1062    }
1063
1064    // Helper function to extract the size of the base64-decoded payload
1065    fn extract_payload_size(json_str: &str) -> usize {
1066        let json: Value = serde_json::from_str(json_str).unwrap();
1067        let payload = json["payload"].as_str().unwrap();
1068        base64_engine.decode(payload).unwrap().len()
1069    }
1070
1071    // Helper function to decode the payload and count the number of spans
1072    fn decode_and_count_spans(json_str: &str) -> usize {
1073        let request = decode_export_request(json_str);
1074
1075        // Count total spans across all resource spans
1076        let mut span_count = 0;
1077        for resource_span in &request.resource_spans {
1078            for scope_span in &resource_span.scope_spans {
1079                span_count += scope_span.spans.len();
1080            }
1081        }
1082
1083        span_count
1084    }
1085
1086    #[tokio::test]
1087    async fn test_export_single_span() {
1088        let (exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1089        let span = create_test_span();
1090
1091        let result = exporter.export(vec![span]).await;
1092        assert!(result.is_ok());
1093
1094        let output = output.get_output();
1095        assert_eq!(output.len(), 1);
1096
1097        // Parse and verify the output
1098        let json: Value = serde_json::from_str(&output[0]).unwrap();
1099        assert_eq!(json["__otel_otlp_stdout"], VERSION);
1100        assert_eq!(json["method"], "POST");
1101        assert_eq!(json["content-type"], "application/x-protobuf");
1102        assert_eq!(json["content-encoding"], "gzip");
1103        assert_eq!(json["base64"], true);
1104
1105        // Verify payload is valid base64 and can be decoded
1106        let payload = json["payload"].as_str().unwrap();
1107        let decoded = base64_engine.decode(payload).unwrap();
1108
1109        // Verify it can be decompressed
1110        let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
1111        let mut decompressed = Vec::new();
1112        decoder.read_to_end(&mut decompressed).unwrap();
1113
1114        // Verify it's valid OTLP protobuf
1115        let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
1116        assert_eq!(request.resource_spans.len(), 1);
1117    }
1118
1119    #[tokio::test]
1120    async fn test_export_preserves_remote_parent_flags_and_resource_attributes() {
1121        let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1122        let resource = Resource::builder_empty()
1123            .with_attributes([
1124                KeyValue::new("service.name", "span-exporter-tests"),
1125                KeyValue::new("deployment.environment", "test"),
1126            ])
1127            .build();
1128        exporter.set_resource(&resource);
1129
1130        let mut span = create_test_span();
1131        span.parent_span_is_remote = true;
1132
1133        exporter.export(vec![span]).await.unwrap();
1134
1135        let output = output.get_output();
1136        assert_eq!(output.len(), 1);
1137
1138        let request = decode_export_request(&output[0]);
1139        let resource_span = request.resource_spans.first().unwrap();
1140        let scope_span = resource_span.scope_spans.first().unwrap();
1141        let exported_span = scope_span.spans.first().unwrap();
1142
1143        assert_eq!(
1144            exported_span.flags & SpanFlags::ContextHasIsRemoteMask as u32,
1145            SpanFlags::ContextHasIsRemoteMask as u32
1146        );
1147        assert_eq!(
1148            exported_span.flags & SpanFlags::ContextIsRemoteMask as u32,
1149            SpanFlags::ContextIsRemoteMask as u32
1150        );
1151
1152        let resource = resource_span.resource.as_ref().unwrap();
1153        let attrs = &resource.attributes;
1154        assert!(attrs.iter().any(|attr| {
1155            attr.key == "service.name"
1156                && attr.value.as_ref().and_then(|value| value.value.as_ref())
1157                    == Some(&AnyValue::StringValue("span-exporter-tests".to_string()))
1158        }));
1159        assert!(attrs.iter().any(|attr| {
1160            attr.key == "deployment.environment"
1161                && attr.value.as_ref().and_then(|value| value.value.as_ref())
1162                    == Some(&AnyValue::StringValue("test".to_string()))
1163        }));
1164    }
1165
1166    #[tokio::test]
1167    async fn test_export_empty_batch() {
1168        let exporter = OtlpStdoutSpanExporter::default();
1169        let result = exporter.export(vec![]).await;
1170        assert!(result.is_ok());
1171    }
1172
1173    #[tokio::test]
1174    async fn test_export_propagates_output_errors() {
1175        let exporter = OtlpStdoutSpanExporter::builder()
1176            .output(Arc::new(FailingOutput))
1177            .build();
1178
1179        let err = exporter.export(vec![create_test_span()]).await.unwrap_err();
1180        assert!(
1181            matches!(err, OTelSdkError::InternalFailure(message) if message == "intentional test sink failure")
1182        );
1183    }
1184
1185    #[test]
1186    #[serial]
1187    fn test_gzip_level_configuration() {
1188        // Ensure all environment variables are removed first
1189        std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1190
1191        // Now test the constructor parameter
1192        let exporter = OtlpStdoutSpanExporter::builder()
1193            .compression_level(9)
1194            .build();
1195        assert_eq!(exporter.compression_level, 9);
1196    }
1197
1198    #[tokio::test]
1199    #[serial]
1200    async fn test_env_var_affects_export_compression() {
1201        // Create more test data with repeated content to make compression differences noticeable
1202        let span = create_test_span();
1203        let mut spans = Vec::new();
1204        // Create 100 spans with large attributes to make compression differences noticeable
1205        for i in 0..100 {
1206            let mut span = span.clone();
1207            // Add unique attribute with large value to make compression more effective
1208            span.attributes
1209                .push(KeyValue::new(format!("test-key-{}", i), "a".repeat(1000)));
1210            spans.push(span);
1211        }
1212
1213        // First, create data with no compression
1214        std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1215        let no_compression_output = Arc::new(TestOutput::new());
1216        let mut no_compression_exporter = OtlpStdoutSpanExporter::builder()
1217            .compression_level(0)
1218            .build();
1219        no_compression_exporter.output = no_compression_output.clone() as Arc<dyn Output>;
1220        let _ = no_compression_exporter.export(spans.clone()).await;
1221        let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
1222
1223        // Now with max compression
1224        std::env::set_var(env_vars::COMPRESSION_LEVEL, "9");
1225        let max_compression_output = Arc::new(TestOutput::new());
1226        let mut max_compression_exporter = OtlpStdoutSpanExporter::builder()
1227            .compression_level(9)
1228            .build();
1229        max_compression_exporter.output = max_compression_output.clone() as Arc<dyn Output>;
1230        let _ = max_compression_exporter.export(spans.clone()).await;
1231        let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
1232
1233        // Verify that the environment variable affected the compression level
1234        assert!(no_compression_size > max_compression_size,
1235            "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
1236            max_compression_size, no_compression_size);
1237
1238        // Test with explicit level when env var is set (env var should take precedence)
1239        std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
1240        let explicit_output = Arc::new(TestOutput::new());
1241
1242        // Create an exporter with the default() method which will use the environment variable
1243        let explicit_exporter = OtlpStdoutSpanExporter::builder()
1244            .output(explicit_output.clone())
1245            .build();
1246
1247        // The environment variable should make it use compression level 0
1248        let _ = explicit_exporter.export(spans.clone()).await;
1249        let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
1250
1251        // Should be approximately the same size as the no_compression_size since
1252        // the environment variable (level 0) should take precedence
1253        assert!(explicit_size > max_compression_size,
1254            "Environment variable should take precedence over explicitly set level. Expected size closer to {} but got {}",
1255            no_compression_size, explicit_size);
1256
1257        // Clean up
1258        std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1259    }
1260
1261    #[tokio::test]
1262    #[serial]
1263    async fn test_environment_variable_precedence() {
1264        // Set environment variable
1265        std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
1266
1267        // With the new precedence rules, environment variables take precedence
1268        // over constructor parameters
1269        let exporter = OtlpStdoutSpanExporter::builder()
1270            .compression_level(9)
1271            .build();
1272        assert_eq!(exporter.compression_level, 3);
1273
1274        // When environment variable is removed, constructor parameter should be used
1275        std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1276        let exporter = OtlpStdoutSpanExporter::builder()
1277            .compression_level(9)
1278            .build();
1279        assert_eq!(exporter.compression_level, 9);
1280    }
1281
1282    #[test]
1283    fn test_exporter_output_deserialization() {
1284        // Create a sample JSON string that would be produced by the exporter
1285        let json_str = r#"{
1286            "__otel_otlp_stdout": "0.11.1",
1287            "source": "test-service",
1288            "endpoint": "http://localhost:4318/v1/traces",
1289            "method": "POST",
1290            "content-type": "application/x-protobuf",
1291            "content-encoding": "gzip",
1292            "headers": {
1293                "api-key": "test-key",
1294                "custom-header": "test-value"
1295            },
1296            "payload": "SGVsbG8gd29ybGQ=",
1297            "base64": true
1298        }"#;
1299
1300        // Deserialize the JSON string into an ExporterOutput
1301        let output: ExporterOutput = serde_json::from_str(json_str).unwrap();
1302
1303        // Verify that all fields are correctly deserialized
1304        assert_eq!(output.version, "0.11.1");
1305        assert_eq!(output.source, "test-service");
1306        assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1307        assert_eq!(output.method, "POST");
1308        assert_eq!(output.content_type, "application/x-protobuf");
1309        assert_eq!(output.content_encoding, "gzip");
1310        assert_eq!(output.headers.as_ref().unwrap().len(), 2);
1311        assert_eq!(
1312            output.headers.as_ref().unwrap().get("api-key").unwrap(),
1313            "test-key"
1314        );
1315        assert_eq!(
1316            output
1317                .headers
1318                .as_ref()
1319                .unwrap()
1320                .get("custom-header")
1321                .unwrap(),
1322            "test-value"
1323        );
1324        assert_eq!(output.payload, "SGVsbG8gd29ybGQ=");
1325        assert!(output.base64);
1326
1327        // Verify that we can decode the base64 payload (if it's valid base64)
1328        let decoded = base64_engine.decode(&output.payload).unwrap();
1329        let payload_text = String::from_utf8(decoded).unwrap();
1330        assert_eq!(payload_text, "Hello world");
1331    }
1332
1333    #[test]
1334    fn test_exporter_output_deserialization_dynamic() {
1335        // Create a dynamic JSON string using String operations
1336        let version = "0.11.1".to_string();
1337        let service = "dynamic-service".to_string();
1338        let payload = base64_engine.encode("Dynamic payload");
1339
1340        // Build the JSON dynamically
1341        let json_str = format!(
1342            r#"{{
1343                "__otel_otlp_stdout": "{}",
1344                "source": "{}",
1345                "endpoint": "http://localhost:4318/v1/traces",
1346                "method": "POST",
1347                "content-type": "application/x-protobuf",
1348                "content-encoding": "gzip",
1349                "headers": {{
1350                    "dynamic-key": "dynamic-value"
1351                }},
1352                "payload": "{}",
1353                "base64": true
1354            }}"#,
1355            version, service, payload
1356        );
1357
1358        // Deserialize the dynamic JSON string
1359        let output: ExporterOutput = serde_json::from_str(&json_str).unwrap();
1360
1361        // Verify fields
1362        assert_eq!(output.version, version);
1363        assert_eq!(output.source, service);
1364        assert_eq!(output.endpoint, "http://localhost:4318/v1/traces");
1365        assert_eq!(output.method, "POST");
1366        assert_eq!(output.content_type, "application/x-protobuf");
1367        assert_eq!(output.content_encoding, "gzip");
1368        assert_eq!(output.headers.as_ref().unwrap().len(), 1);
1369        assert_eq!(
1370            output.headers.as_ref().unwrap().get("dynamic-key").unwrap(),
1371            "dynamic-value"
1372        );
1373        assert_eq!(output.payload, payload);
1374        assert!(output.base64);
1375
1376        // Verify payload decoding
1377        let decoded = base64_engine.decode(&output.payload).unwrap();
1378        let payload_text = String::from_utf8(decoded).unwrap();
1379        assert_eq!(payload_text, "Dynamic payload");
1380    }
1381
1382    #[test]
1383    fn test_log_level_from_str() {
1384        assert_eq!(LogLevel::from_str("debug").unwrap(), LogLevel::Debug);
1385        assert_eq!(LogLevel::from_str("DEBUG").unwrap(), LogLevel::Debug);
1386        assert_eq!(LogLevel::from_str("info").unwrap(), LogLevel::Info);
1387        assert_eq!(LogLevel::from_str("INFO").unwrap(), LogLevel::Info);
1388        assert_eq!(LogLevel::from_str("warn").unwrap(), LogLevel::Warn);
1389        assert_eq!(LogLevel::from_str("warning").unwrap(), LogLevel::Warn);
1390        assert_eq!(LogLevel::from_str("WARN").unwrap(), LogLevel::Warn);
1391        assert_eq!(LogLevel::from_str("error").unwrap(), LogLevel::Error);
1392        assert_eq!(LogLevel::from_str("ERROR").unwrap(), LogLevel::Error);
1393
1394        assert!(LogLevel::from_str("invalid").is_err());
1395    }
1396
1397    #[test]
1398    fn test_log_level_display() {
1399        assert_eq!(LogLevel::Debug.to_string(), "DEBUG");
1400        assert_eq!(LogLevel::Info.to_string(), "INFO");
1401        assert_eq!(LogLevel::Warn.to_string(), "WARN");
1402        assert_eq!(LogLevel::Error.to_string(), "ERROR");
1403    }
1404
1405    #[test]
1406    fn test_buffer_output_round_trip() {
1407        let output = BufferOutput::new();
1408        output.write_line("first").unwrap();
1409        output.write_line("second").unwrap();
1410
1411        assert_eq!(
1412            output.take_lines().unwrap(),
1413            vec!["first".to_string(), "second".to_string()]
1414        );
1415        assert!(output.take_lines().unwrap().is_empty());
1416        assert!(format!("{output:?}").contains("BufferOutput"));
1417    }
1418
1419    #[test]
1420    #[serial]
1421    fn test_log_level_from_env() {
1422        // Set environment variable
1423        std::env::set_var(env_vars::LOG_LEVEL, "debug");
1424        let exporter = OtlpStdoutSpanExporter::default();
1425        assert_eq!(exporter.level, Some(LogLevel::Debug));
1426
1427        // Test with invalid level
1428        std::env::set_var(env_vars::LOG_LEVEL, "invalid");
1429        let exporter = OtlpStdoutSpanExporter::default();
1430        assert_eq!(exporter.level, None);
1431
1432        // Test with constructor parameter
1433        std::env::remove_var(env_vars::LOG_LEVEL);
1434        let exporter = OtlpStdoutSpanExporter::builder()
1435            .level(LogLevel::Error)
1436            .build();
1437        assert_eq!(exporter.level, Some(LogLevel::Error));
1438
1439        // Test env var takes precedence over constructor
1440        std::env::set_var(env_vars::LOG_LEVEL, "warn");
1441        let exporter = OtlpStdoutSpanExporter::builder()
1442            .level(LogLevel::Error)
1443            .build();
1444        assert_eq!(exporter.level, Some(LogLevel::Warn));
1445
1446        // Clean up
1447        std::env::remove_var(env_vars::LOG_LEVEL);
1448    }
1449
1450    #[test]
1451    #[serial]
1452    fn test_invalid_numeric_compression_level_falls_back() {
1453        std::env::set_var(env_vars::COMPRESSION_LEVEL, "99");
1454        let exporter = OtlpStdoutSpanExporter::builder()
1455            .compression_level(4)
1456            .build();
1457        assert_eq!(exporter.compression_level, 4);
1458        std::env::remove_var(env_vars::COMPRESSION_LEVEL);
1459    }
1460
1461    #[test]
1462    #[serial]
1463    fn test_header_merge_and_filtering() {
1464        std::env::set_var(
1465            env_vars::OTLP_HEADERS,
1466            "content-type=bad, malformed, x-env=env-value",
1467        );
1468        std::env::set_var(
1469            env_vars::OTLP_TRACES_HEADERS,
1470            "content-encoding=bad, x-env=trace-value, x-trace=trace-only",
1471        );
1472
1473        let mut constructor_headers = HashMap::new();
1474        constructor_headers.insert("x-constructor".to_string(), "constructor-value".to_string());
1475        constructor_headers.insert("x-env".to_string(), "constructor-env".to_string());
1476
1477        let exporter = OtlpStdoutSpanExporter::builder()
1478            .headers(constructor_headers)
1479            .build();
1480        let headers = exporter.headers.unwrap();
1481
1482        assert_eq!(headers.get("x-constructor").unwrap(), "constructor-value");
1483        assert_eq!(headers.get("x-env").unwrap(), "trace-value");
1484        assert_eq!(headers.get("x-trace").unwrap(), "trace-only");
1485        assert!(!headers.contains_key("content-type"));
1486        assert!(!headers.contains_key("content-encoding"));
1487
1488        std::env::set_var(
1489            env_vars::OTLP_HEADERS,
1490            "content-type=bad, content-encoding=bad, malformed",
1491        );
1492        std::env::remove_var(env_vars::OTLP_TRACES_HEADERS);
1493        assert!(OtlpStdoutSpanExporter::parse_headers().is_none());
1494
1495        std::env::remove_var(env_vars::OTLP_HEADERS);
1496        std::env::remove_var(env_vars::OTLP_TRACES_HEADERS);
1497    }
1498
1499    #[test]
1500    fn test_shutdown_and_force_flush_are_noops() {
1501        let mut exporter = OtlpStdoutSpanExporter::default();
1502        assert!(exporter.force_flush().is_ok());
1503        assert!(exporter
1504            .shutdown_with_timeout(Duration::from_millis(1))
1505            .is_ok());
1506    }
1507
1508    #[tokio::test]
1509    #[serial]
1510    async fn test_log_level_in_output() {
1511        // Create a test exporter with a specific log level
1512        let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1513        exporter.level = Some(LogLevel::Debug);
1514        let span = create_test_span();
1515
1516        let result = exporter.export(vec![span]).await;
1517        assert!(result.is_ok());
1518
1519        let output_lines = output.get_output();
1520        assert_eq!(output_lines.len(), 1);
1521
1522        // Parse the JSON to check the level field
1523        let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1524        assert_eq!(json["level"], "DEBUG");
1525
1526        // Test with no level set
1527        let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
1528        exporter.level = None;
1529        let span = create_test_span();
1530
1531        let result = exporter.export(vec![span]).await;
1532        assert!(result.is_ok());
1533
1534        let output_lines = output.get_output();
1535        assert_eq!(output_lines.len(), 1);
1536
1537        // Parse the JSON to check level field is omitted
1538        let json: Value = serde_json::from_str(&output_lines[0]).unwrap();
1539        assert!(!json.as_object().unwrap().contains_key("level"));
1540    }
1541
1542    #[cfg(unix)]
1543    #[test]
1544    fn test_named_pipe_output_writes_to_real_fifo() {
1545        let path = create_test_fifo("write-line");
1546        let path_for_reader = path.clone();
1547
1548        let reader = thread::spawn(move || {
1549            let mut file = OpenOptions::new()
1550                .read(true)
1551                .open(&path_for_reader)
1552                .unwrap();
1553            let mut contents = String::new();
1554            file.read_to_string(&mut contents).unwrap();
1555            contents
1556        });
1557
1558        let output = NamedPipeOutput { path: path.clone() };
1559        assert!(output.is_pipe());
1560        output.write_line("hello from fifo").unwrap();
1561
1562        let contents = reader.join().unwrap();
1563        assert_eq!(contents, "hello from fifo\n");
1564        std::fs::remove_file(path).unwrap();
1565    }
1566
1567    #[cfg(unix)]
1568    #[tokio::test]
1569    async fn test_export_empty_batch_touches_real_fifo() {
1570        let path = create_test_fifo("touch-pipe");
1571        let path_for_reader = path.clone();
1572
1573        let reader = thread::spawn(move || {
1574            let mut file = OpenOptions::new()
1575                .read(true)
1576                .open(&path_for_reader)
1577                .unwrap();
1578            let mut bytes = Vec::new();
1579            file.read_to_end(&mut bytes).unwrap();
1580            bytes
1581        });
1582
1583        let exporter = OtlpStdoutSpanExporter {
1584            compression_level: defaults::COMPRESSION_LEVEL,
1585            resource: None,
1586            headers: None,
1587            output: Arc::new(NamedPipeOutput { path: path.clone() }),
1588            level: None,
1589        };
1590
1591        exporter.export(vec![]).await.unwrap();
1592
1593        let bytes = reader.join().unwrap();
1594        assert!(bytes.is_empty());
1595        std::fs::remove_file(path).unwrap();
1596    }
1597
1598    #[test]
1599    fn test_stdout_output() {
1600        let output = create_output(false);
1601        // We can't easily test stdout directly, but we can verify the type is created
1602        assert!(format!("{:?}", output).contains("StdOutput"));
1603    }
1604
1605    #[test]
1606    fn test_pipe_output() {
1607        let output = create_output(true);
1608        // Even if pipe doesn't exist, we should get a NamedPipeOutput or StdOutput fallback
1609        let debug_str = format!("{:?}", output);
1610        assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1611    }
1612
1613    #[test]
1614    fn test_env_var_precedence() {
1615        // Create a temporary directory for testing
1616        let temp_dir = std::env::temp_dir();
1617        let path = temp_dir.join("test_pipe");
1618
1619        // Make sure no other environment variables interfere
1620        std::env::remove_var(env_vars::OUTPUT_TYPE);
1621
1622        // Set the environment variable to use pipe
1623        std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1624
1625        // Create the exporter
1626        let exporter = OtlpStdoutSpanExporter::default();
1627
1628        // Verify the output type
1629        let debug_str = format!("{:?}", exporter.output);
1630        assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1631
1632        // Clean up
1633        std::env::remove_var(env_vars::OUTPUT_TYPE);
1634        if path.exists() {
1635            let _ = std::fs::remove_file(path);
1636        }
1637    }
1638
1639    #[test]
1640    fn test_constructor_precedence() {
1641        // Create a temporary directory for testing
1642        let temp_dir = std::env::temp_dir();
1643        let path = temp_dir.join("test_pipe");
1644
1645        // Make sure the environment variable is not set
1646        std::env::remove_var(env_vars::OUTPUT_TYPE);
1647
1648        // Create the exporter with pipe output
1649        let exporter = OtlpStdoutSpanExporter::builder().pipe(true).build();
1650
1651        // Verify the output type
1652        let debug_str = format!("{:?}", exporter.output);
1653        assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1654
1655        // Clean up
1656        if path.exists() {
1657            let _ = std::fs::remove_file(path);
1658        }
1659    }
1660
1661    #[test]
1662    fn test_env_var_overrides_constructor() {
1663        // Create temporary directory for testing
1664        let temp_dir = std::env::temp_dir();
1665        let path = temp_dir.join("test_pipe");
1666
1667        // Set the environment variable to use pipe
1668        std::env::set_var(env_vars::OUTPUT_TYPE, "pipe");
1669
1670        // Create the exporter with stdout in constructor
1671        let exporter = OtlpStdoutSpanExporter::builder().pipe(false).build();
1672
1673        // Verify that env var took precedence (pipe output)
1674        let debug_str = format!("{:?}", exporter.output);
1675        assert!(debug_str.contains("NamedPipeOutput") || debug_str.contains("StdOutput"));
1676
1677        // Clean up
1678        std::env::remove_var(env_vars::OUTPUT_TYPE);
1679        if path.exists() {
1680            let _ = std::fs::remove_file(path);
1681        }
1682    }
1683}