1use async_trait::async_trait;
88use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
89use flate2::{write::GzEncoder, Compression};
90use futures_util::future::BoxFuture;
91use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
92use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
93use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
94use opentelemetry_sdk::resource::Resource;
95use opentelemetry_sdk::{
96 error::OTelSdkError,
97 trace::{SpanData, SpanExporter},
98};
99use prost::Message;
100use serde::Serialize;
101#[cfg(test)]
102use std::sync::Mutex;
103use std::{
104 collections::HashMap,
105 env,
106 io::{self, Write},
107 result::Result,
108 sync::Arc,
109};
110
111const VERSION: &str = env!("CARGO_PKG_VERSION");
112const DEFAULT_ENDPOINT: &str = "http://localhost:4318/v1/traces";
113const DEFAULT_COMPRESSION_LEVEL: u8 = 6;
114const COMPRESSION_LEVEL_ENV_VAR: &str = "OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL";
115
116trait Output: Send + Sync + std::fmt::Debug {
121 fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
131}
132
133#[derive(Debug, Default)]
135struct StdOutput;
136
137impl Output for StdOutput {
138 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
139 let stdout = io::stdout();
141 let mut handle = stdout.lock();
142
143 writeln!(handle, "{}", line).map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
145
146 Ok(())
147 }
148}
149
150#[cfg(test)]
152#[derive(Debug, Default)]
153struct TestOutput {
154 buffer: Arc<Mutex<Vec<String>>>,
155}
156
157#[cfg(test)]
158impl TestOutput {
159 fn new() -> Self {
160 Self {
161 buffer: Arc::new(Mutex::new(Vec::new())),
162 }
163 }
164
165 fn get_output(&self) -> Vec<String> {
166 self.buffer.lock().unwrap().clone()
167 }
168}
169
170#[cfg(test)]
171impl Output for TestOutput {
172 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
173 self.buffer.lock().unwrap().push(line.to_string());
174 Ok(())
175 }
176}
177
178#[derive(Debug, Serialize)]
183struct ExporterOutput<'a> {
184 #[serde(rename = "__otel_otlp_stdout")]
186 version: &'a str,
187 source: String,
189 endpoint: &'a str,
191 method: &'a str,
193 #[serde(rename = "content-type")]
195 content_type: &'a str,
196 #[serde(rename = "content-encoding")]
198 content_encoding: &'a str,
199 #[serde(skip_serializing_if = "HashMap::is_empty")]
201 headers: HashMap<String, String>,
202 payload: String,
204 base64: bool,
206}
207
208#[derive(Debug)]
230pub struct OtlpStdoutSpanExporter {
231 gzip_level: u8,
233 resource: Option<Resource>,
235 output: Arc<dyn Output>,
237}
238
239impl Default for OtlpStdoutSpanExporter {
240 fn default() -> Self {
241 Self::new()
242 }
243}
244
245impl OtlpStdoutSpanExporter {
246 pub fn new() -> Self {
260 let gzip_level =
261 Self::get_compression_level_from_env().unwrap_or(DEFAULT_COMPRESSION_LEVEL);
262 Self {
263 gzip_level,
264 resource: None,
265 output: Arc::new(StdOutput),
266 }
267 }
268
269 pub fn with_gzip_level(gzip_level: u8) -> Self {
287 Self {
288 gzip_level,
289 resource: None,
290 output: Arc::new(StdOutput),
291 }
292 }
293
294 fn get_compression_level_from_env() -> Option<u8> {
300 env::var(COMPRESSION_LEVEL_ENV_VAR)
301 .ok()
302 .and_then(|val| val.parse::<u8>().ok())
303 .and_then(|level| if level <= 9 { Some(level) } else { None })
304 }
305
306 #[cfg(test)]
307 fn with_test_output() -> (Self, Arc<TestOutput>) {
308 let output = Arc::new(TestOutput::new());
309 let gzip_level =
310 Self::get_compression_level_from_env().unwrap_or(DEFAULT_COMPRESSION_LEVEL);
311 let exporter = Self {
312 gzip_level,
313 resource: None,
314 output: output.clone() as Arc<dyn Output>,
315 };
316 (exporter, output)
317 }
318
319 fn parse_headers() -> HashMap<String, String> {
324 let mut headers = HashMap::new();
325
326 if let Ok(global_headers) = env::var("OTEL_EXPORTER_OTLP_HEADERS") {
328 Self::parse_header_string(&global_headers, &mut headers);
329 }
330
331 if let Ok(trace_headers) = env::var("OTEL_EXPORTER_OTLP_TRACES_HEADERS") {
333 Self::parse_header_string(&trace_headers, &mut headers);
334 }
335
336 headers
337 }
338
339 fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
346 for pair in header_str.split(',') {
347 if let Some((key, value)) = pair.split_once('=') {
348 let key = key.trim().to_lowercase();
349 if key != "content-type" && key != "content-encoding" {
351 headers.insert(key, value.trim().to_string());
352 }
353 }
354 }
355 }
356
357 fn get_service_name() -> String {
364 env::var("OTEL_SERVICE_NAME")
365 .or_else(|_| env::var("AWS_LAMBDA_FUNCTION_NAME"))
366 .unwrap_or_else(|_| "unknown-service".to_string())
367 }
368}
369
370#[async_trait]
371impl SpanExporter for OtlpStdoutSpanExporter {
372 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, Result<(), OTelSdkError>> {
389 let result = (|| {
391 let resource = self
393 .resource
394 .clone()
395 .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
396 let resource_attrs = ResourceAttributesWithSchema::from(&resource);
397 let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
398 let request = ExportTraceServiceRequest { resource_spans };
399
400 let proto_bytes = request.encode_to_vec();
402
403 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.gzip_level as u32));
405 encoder
406 .write_all(&proto_bytes)
407 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
408 let compressed_bytes = encoder
409 .finish()
410 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
411
412 let payload = base64_engine.encode(compressed_bytes);
414
415 let output_data = ExporterOutput {
417 version: VERSION,
418 source: Self::get_service_name(),
419 endpoint: DEFAULT_ENDPOINT,
420 method: "POST",
421 content_type: "application/x-protobuf",
422 content_encoding: "gzip",
423 headers: Self::parse_headers(),
424 payload,
425 base64: true,
426 };
427
428 self.output.write_line(
430 &serde_json::to_string(&output_data)
431 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
432 )?;
433
434 Ok(())
435 })();
436
437 Box::pin(std::future::ready(result))
439 }
440
441 fn shutdown(&mut self) -> Result<(), OTelSdkError> {
449 Ok(())
450 }
451
452 fn force_flush(&mut self) -> Result<(), OTelSdkError> {
460 Ok(())
461 }
462
463 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
473 self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
474 resource.clone(),
475 ));
476 }
477}
478
479#[cfg(doctest)]
480#[macro_use]
481extern crate doc_comment;
482
483#[cfg(doctest)]
484use doc_comment::doctest;
485
486#[cfg(doctest)]
487doctest!("../README.md", readme);
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492 use opentelemetry::{
493 trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
494 InstrumentationScope, KeyValue,
495 };
496 use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
497 use serde_json::Value;
498 use std::time::SystemTime;
499
500 fn create_test_span() -> SpanData {
501 let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
502 let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
503 let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
504
505 let span_context = SpanContext::new(
506 TraceId::from_bytes(trace_id_bytes),
507 SpanId::from_bytes(span_id_bytes),
508 TraceFlags::default(),
509 false,
510 TraceState::default(),
511 );
512
513 SpanData {
514 span_context,
515 parent_span_id: SpanId::from_bytes(parent_id_bytes),
516 span_kind: SpanKind::Client,
517 name: "test-span".into(),
518 start_time: SystemTime::UNIX_EPOCH,
519 end_time: SystemTime::UNIX_EPOCH,
520 attributes: vec![KeyValue::new("test.key", "test-value")],
521 dropped_attributes_count: 0,
522 events: SpanEvents::default(),
523 links: SpanLinks::default(),
524 status: Status::Ok,
525 instrumentation_scope: InstrumentationScope::builder("test-library")
526 .with_version("1.0.0")
527 .with_schema_url("https://opentelemetry.io/schema/1.0.0")
528 .build(),
529 }
530 }
531
532 #[test]
533 fn test_parse_headers() {
534 std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
535 std::env::set_var(
536 "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
537 "key2=override,key3=value3",
538 );
539
540 let headers = OtlpStdoutSpanExporter::parse_headers();
541
542 assert_eq!(headers.get("key1").unwrap(), "value1");
543 assert_eq!(headers.get("key2").unwrap(), "override");
544 assert_eq!(headers.get("key3").unwrap(), "value3");
545
546 std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
548 std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
549 }
550
551 #[test]
552 fn test_service_name_resolution() {
553 std::env::set_var("OTEL_SERVICE_NAME", "otel-service");
555 std::env::set_var("AWS_LAMBDA_FUNCTION_NAME", "lambda-function");
556 assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
557
558 std::env::remove_var("OTEL_SERVICE_NAME");
560 assert_eq!(
561 OtlpStdoutSpanExporter::get_service_name(),
562 "lambda-function"
563 );
564
565 std::env::remove_var("AWS_LAMBDA_FUNCTION_NAME");
567 assert_eq!(
568 OtlpStdoutSpanExporter::get_service_name(),
569 "unknown-service"
570 );
571 }
572
573 #[test]
574 fn test_compression_level_from_env() {
575 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
577 assert_eq!(
578 OtlpStdoutSpanExporter::get_compression_level_from_env(),
579 Some(3)
580 );
581
582 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "10");
584 assert_eq!(
585 OtlpStdoutSpanExporter::get_compression_level_from_env(),
586 None
587 );
588
589 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "invalid");
591 assert_eq!(
592 OtlpStdoutSpanExporter::get_compression_level_from_env(),
593 None
594 );
595
596 std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
598 assert_eq!(
599 OtlpStdoutSpanExporter::get_compression_level_from_env(),
600 None
601 );
602 }
603
604 #[test]
605 fn test_new_uses_env_compression_level() {
606 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
608 let exporter = OtlpStdoutSpanExporter::new();
609 assert_eq!(exporter.gzip_level, 3);
610
611 std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
613 let exporter = OtlpStdoutSpanExporter::new();
614 assert_eq!(exporter.gzip_level, DEFAULT_COMPRESSION_LEVEL);
615 }
616
617 #[test]
618 fn test_with_gzip_level_overrides_env() {
619 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
621
622 let exporter = OtlpStdoutSpanExporter::with_gzip_level(8);
624 assert_eq!(exporter.gzip_level, 8);
625
626 std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
628 }
629
630 #[tokio::test]
631 async fn test_compression_level_affects_output_size() {
632 let mut spans = Vec::new();
634 for i in 0..100 {
635 let mut span = create_test_span();
636 span.attributes.push(KeyValue::new("index", i));
638 span.attributes.push(KeyValue::new("data", "a".repeat(100)));
639 spans.push(span);
640 }
641
642 let (mut no_compression_exporter, no_compression_output) =
644 OtlpStdoutSpanExporter::with_test_output();
645 no_compression_exporter.gzip_level = 0;
646 let _ = no_compression_exporter.export(spans.clone()).await;
647 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
648
649 let (mut medium_compression_exporter, medium_compression_output) =
651 OtlpStdoutSpanExporter::with_test_output();
652 medium_compression_exporter.gzip_level = 5;
653 let _ = medium_compression_exporter.export(spans.clone()).await;
654 let medium_compression_size =
655 extract_payload_size(&medium_compression_output.get_output()[0]);
656
657 let (mut max_compression_exporter, max_compression_output) =
659 OtlpStdoutSpanExporter::with_test_output();
660 max_compression_exporter.gzip_level = 9;
661 let _ = max_compression_exporter.export(spans.clone()).await;
662 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
663
664 assert!(no_compression_size > medium_compression_size,
666 "Medium compression (level 5) should produce smaller output than no compression (level 0). Got {} vs {}",
667 medium_compression_size, no_compression_size);
668
669 assert!(medium_compression_size >= max_compression_size,
670 "Maximum compression (level 9) should produce output no larger than medium compression (level 5). Got {} vs {}",
671 max_compression_size, medium_compression_size);
672
673 let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
675 let medium_compression_spans =
676 decode_and_count_spans(&medium_compression_output.get_output()[0]);
677 let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
678
679 assert_eq!(
680 no_compression_spans,
681 spans.len(),
682 "No compression output should contain all spans"
683 );
684 assert_eq!(
685 medium_compression_spans,
686 spans.len(),
687 "Medium compression output should contain all spans"
688 );
689 assert_eq!(
690 max_compression_spans,
691 spans.len(),
692 "Maximum compression output should contain all spans"
693 );
694 }
695
696 fn extract_payload_size(json_str: &str) -> usize {
698 let json: Value = serde_json::from_str(json_str).unwrap();
699 let payload = json["payload"].as_str().unwrap();
700 base64_engine.decode(payload).unwrap().len()
701 }
702
703 fn decode_and_count_spans(json_str: &str) -> usize {
705 let json: Value = serde_json::from_str(json_str).unwrap();
706 let payload = json["payload"].as_str().unwrap();
707 let decoded = base64_engine.decode(payload).unwrap();
708
709 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
710 let mut decompressed = Vec::new();
711 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
712
713 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
714
715 let mut span_count = 0;
717 for resource_span in &request.resource_spans {
718 for scope_span in &resource_span.scope_spans {
719 span_count += scope_span.spans.len();
720 }
721 }
722
723 span_count
724 }
725
726 #[tokio::test]
727 async fn test_export_single_span() {
728 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
729 let span = create_test_span();
730
731 let result = exporter.export(vec![span]).await;
732 assert!(result.is_ok());
733
734 let output = output.get_output();
735 assert_eq!(output.len(), 1);
736
737 let json: Value = serde_json::from_str(&output[0]).unwrap();
739 assert_eq!(json["__otel_otlp_stdout"], VERSION);
740 assert_eq!(json["method"], "POST");
741 assert_eq!(json["content-type"], "application/x-protobuf");
742 assert_eq!(json["content-encoding"], "gzip");
743 assert_eq!(json["base64"], true);
744
745 let payload = json["payload"].as_str().unwrap();
747 let decoded = base64_engine.decode(payload).unwrap();
748
749 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
751 let mut decompressed = Vec::new();
752 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
753
754 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
756 assert_eq!(request.resource_spans.len(), 1);
757 }
758
759 #[tokio::test]
760 async fn test_export_empty_batch() {
761 let mut exporter = OtlpStdoutSpanExporter::new();
762 let result = exporter.export(vec![]).await;
763 assert!(result.is_ok());
764 }
765
766 #[test]
767 fn test_gzip_level_configuration() {
768 let exporter = OtlpStdoutSpanExporter::with_gzip_level(9);
769 assert_eq!(exporter.gzip_level, 9);
770 }
771
772 #[tokio::test]
773 async fn test_env_var_affects_export_compression() {
774 let span = create_test_span();
776 let spans = vec![span];
777
778 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "0");
780 let (mut env_exporter_0, env_output_0) = OtlpStdoutSpanExporter::with_test_output();
781 let _ = env_exporter_0.export(spans.clone()).await;
782 let env_size_0 = extract_payload_size(&env_output_0.get_output()[0]);
783
784 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "9");
786 let (mut env_exporter_9, env_output_9) = OtlpStdoutSpanExporter::with_test_output();
787 let _ = env_exporter_9.export(spans.clone()).await;
788 let env_size_9 = extract_payload_size(&env_output_9.get_output()[0]);
789
790 assert!(env_size_0 > env_size_9,
792 "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
793 env_size_9, env_size_0);
794
795 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "invalid");
797 let (mut env_exporter_invalid, _env_output_invalid) =
798 OtlpStdoutSpanExporter::with_test_output();
799 let _ = env_exporter_invalid.export(spans.clone()).await;
800
801 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "0");
803 let (mut explicit_exporter, explicit_output) = OtlpStdoutSpanExporter::with_test_output();
804 explicit_exporter.gzip_level = 9;
805 let _ = explicit_exporter.export(spans.clone()).await;
806 let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
807
808 assert!(env_size_0 > explicit_size,
810 "Explicit level 9 should produce smaller output than environment variable level 0. Got {} vs {}",
811 explicit_size, env_size_0);
812
813 std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
815 }
816}