1use async_trait::async_trait;
83use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
84use flate2::{write::GzEncoder, Compression};
85use futures_util::future::BoxFuture;
86use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
87use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
88use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
89use opentelemetry_sdk::resource::Resource;
90use opentelemetry_sdk::{
91 error::OTelSdkError,
92 trace::{SpanData, SpanExporter},
93};
94use prost::Message;
95use serde::Serialize;
96#[cfg(test)]
97use std::sync::Mutex;
98use std::{collections::HashMap, env, io::Write, result::Result, sync::Arc};
99
100const VERSION: &str = env!("CARGO_PKG_VERSION");
101const DEFAULT_ENDPOINT: &str = "http://localhost:4318/v1/traces";
102const DEFAULT_COMPRESSION_LEVEL: u8 = 6;
103const COMPRESSION_LEVEL_ENV_VAR: &str = "OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL";
104
105trait Output: Send + Sync + std::fmt::Debug {
110 fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
120}
121
122#[derive(Debug, Default)]
124struct StdOutput;
125
126impl Output for StdOutput {
127 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
128 println!("{}", line);
129 Ok(())
130 }
131}
132
133#[cfg(test)]
135#[derive(Debug, Default)]
136struct TestOutput {
137 buffer: Arc<Mutex<Vec<String>>>,
138}
139
140#[cfg(test)]
141impl TestOutput {
142 fn new() -> Self {
143 Self {
144 buffer: Arc::new(Mutex::new(Vec::new())),
145 }
146 }
147
148 fn get_output(&self) -> Vec<String> {
149 self.buffer.lock().unwrap().clone()
150 }
151}
152
153#[cfg(test)]
154impl Output for TestOutput {
155 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
156 self.buffer.lock().unwrap().push(line.to_string());
157 Ok(())
158 }
159}
160
161#[derive(Debug, Serialize)]
166struct ExporterOutput<'a> {
167 #[serde(rename = "__otel_otlp_stdout")]
169 version: &'a str,
170 source: String,
172 endpoint: &'a str,
174 method: &'a str,
176 #[serde(rename = "content-type")]
178 content_type: &'a str,
179 #[serde(rename = "content-encoding")]
181 content_encoding: &'a str,
182 #[serde(skip_serializing_if = "HashMap::is_empty")]
184 headers: HashMap<String, String>,
185 payload: String,
187 base64: bool,
189}
190
191#[derive(Debug)]
213pub struct OtlpStdoutSpanExporter {
214 gzip_level: u8,
216 resource: Option<Resource>,
218 output: Arc<dyn Output>,
220}
221
222impl Default for OtlpStdoutSpanExporter {
223 fn default() -> Self {
224 Self::new()
225 }
226}
227
228impl OtlpStdoutSpanExporter {
229 pub fn new() -> Self {
243 let gzip_level =
244 Self::get_compression_level_from_env().unwrap_or(DEFAULT_COMPRESSION_LEVEL);
245 Self {
246 gzip_level,
247 resource: None,
248 output: Arc::new(StdOutput),
249 }
250 }
251
252 pub fn with_gzip_level(gzip_level: u8) -> Self {
270 Self {
271 gzip_level,
272 resource: None,
273 output: Arc::new(StdOutput),
274 }
275 }
276
277 fn get_compression_level_from_env() -> Option<u8> {
283 env::var(COMPRESSION_LEVEL_ENV_VAR)
284 .ok()
285 .and_then(|val| val.parse::<u8>().ok())
286 .and_then(|level| if level <= 9 { Some(level) } else { None })
287 }
288
289 #[cfg(test)]
290 fn with_test_output() -> (Self, Arc<TestOutput>) {
291 let output = Arc::new(TestOutput::new());
292 let gzip_level =
293 Self::get_compression_level_from_env().unwrap_or(DEFAULT_COMPRESSION_LEVEL);
294 let exporter = Self {
295 gzip_level,
296 resource: None,
297 output: output.clone() as Arc<dyn Output>,
298 };
299 (exporter, output)
300 }
301
302 fn parse_headers() -> HashMap<String, String> {
307 let mut headers = HashMap::new();
308
309 if let Ok(global_headers) = env::var("OTEL_EXPORTER_OTLP_HEADERS") {
311 Self::parse_header_string(&global_headers, &mut headers);
312 }
313
314 if let Ok(trace_headers) = env::var("OTEL_EXPORTER_OTLP_TRACES_HEADERS") {
316 Self::parse_header_string(&trace_headers, &mut headers);
317 }
318
319 headers
320 }
321
322 fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
329 for pair in header_str.split(',') {
330 if let Some((key, value)) = pair.split_once('=') {
331 let key = key.trim().to_lowercase();
332 if key != "content-type" && key != "content-encoding" {
334 headers.insert(key, value.trim().to_string());
335 }
336 }
337 }
338 }
339
340 fn get_service_name() -> String {
347 env::var("OTEL_SERVICE_NAME")
348 .or_else(|_| env::var("AWS_LAMBDA_FUNCTION_NAME"))
349 .unwrap_or_else(|_| "unknown-service".to_string())
350 }
351}
352
353#[async_trait]
354impl SpanExporter for OtlpStdoutSpanExporter {
355 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, Result<(), OTelSdkError>> {
372 let result = (|| {
374 let resource = self
376 .resource
377 .clone()
378 .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
379 let resource_attrs = ResourceAttributesWithSchema::from(&resource);
380 let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
381 let request = ExportTraceServiceRequest { resource_spans };
382
383 let proto_bytes = request.encode_to_vec();
385
386 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.gzip_level as u32));
388 encoder
389 .write_all(&proto_bytes)
390 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
391 let compressed_bytes = encoder
392 .finish()
393 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
394
395 let payload = base64_engine.encode(compressed_bytes);
397
398 let output_data = ExporterOutput {
400 version: VERSION,
401 source: Self::get_service_name(),
402 endpoint: DEFAULT_ENDPOINT,
403 method: "POST",
404 content_type: "application/x-protobuf",
405 content_encoding: "gzip",
406 headers: Self::parse_headers(),
407 payload,
408 base64: true,
409 };
410
411 self.output.write_line(
413 &serde_json::to_string(&output_data)
414 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
415 )?;
416
417 Ok(())
418 })();
419
420 Box::pin(std::future::ready(result))
422 }
423
424 fn shutdown(&mut self) -> Result<(), OTelSdkError> {
432 Ok(())
433 }
434
435 fn force_flush(&mut self) -> Result<(), OTelSdkError> {
443 Ok(())
444 }
445
446 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
456 self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
457 resource.clone(),
458 ));
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use opentelemetry::{
466 trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
467 InstrumentationScope, KeyValue,
468 };
469 use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
470 use serde_json::Value;
471 use std::time::SystemTime;
472
473 fn create_test_span() -> SpanData {
474 let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
475 let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
476 let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
477
478 let span_context = SpanContext::new(
479 TraceId::from_bytes(trace_id_bytes),
480 SpanId::from_bytes(span_id_bytes),
481 TraceFlags::default(),
482 false,
483 TraceState::default(),
484 );
485
486 SpanData {
487 span_context,
488 parent_span_id: SpanId::from_bytes(parent_id_bytes),
489 span_kind: SpanKind::Client,
490 name: "test-span".into(),
491 start_time: SystemTime::UNIX_EPOCH,
492 end_time: SystemTime::UNIX_EPOCH,
493 attributes: vec![KeyValue::new("test.key", "test-value")],
494 dropped_attributes_count: 0,
495 events: SpanEvents::default(),
496 links: SpanLinks::default(),
497 status: Status::Ok,
498 instrumentation_scope: InstrumentationScope::builder("test-library")
499 .with_version("1.0.0")
500 .with_schema_url("https://opentelemetry.io/schema/1.0.0")
501 .build(),
502 }
503 }
504
505 #[test]
506 fn test_parse_headers() {
507 std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
508 std::env::set_var(
509 "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
510 "key2=override,key3=value3",
511 );
512
513 let headers = OtlpStdoutSpanExporter::parse_headers();
514
515 assert_eq!(headers.get("key1").unwrap(), "value1");
516 assert_eq!(headers.get("key2").unwrap(), "override");
517 assert_eq!(headers.get("key3").unwrap(), "value3");
518
519 std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
521 std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
522 }
523
524 #[test]
525 fn test_service_name_resolution() {
526 std::env::set_var("OTEL_SERVICE_NAME", "otel-service");
528 std::env::set_var("AWS_LAMBDA_FUNCTION_NAME", "lambda-function");
529 assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
530
531 std::env::remove_var("OTEL_SERVICE_NAME");
533 assert_eq!(
534 OtlpStdoutSpanExporter::get_service_name(),
535 "lambda-function"
536 );
537
538 std::env::remove_var("AWS_LAMBDA_FUNCTION_NAME");
540 assert_eq!(
541 OtlpStdoutSpanExporter::get_service_name(),
542 "unknown-service"
543 );
544 }
545
546 #[test]
547 fn test_compression_level_from_env() {
548 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
550 assert_eq!(
551 OtlpStdoutSpanExporter::get_compression_level_from_env(),
552 Some(3)
553 );
554
555 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "10");
557 assert_eq!(
558 OtlpStdoutSpanExporter::get_compression_level_from_env(),
559 None
560 );
561
562 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "invalid");
564 assert_eq!(
565 OtlpStdoutSpanExporter::get_compression_level_from_env(),
566 None
567 );
568
569 std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
571 assert_eq!(
572 OtlpStdoutSpanExporter::get_compression_level_from_env(),
573 None
574 );
575 }
576
577 #[test]
578 fn test_new_uses_env_compression_level() {
579 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
581 let exporter = OtlpStdoutSpanExporter::new();
582 assert_eq!(exporter.gzip_level, 3);
583
584 std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
586 let exporter = OtlpStdoutSpanExporter::new();
587 assert_eq!(exporter.gzip_level, DEFAULT_COMPRESSION_LEVEL);
588 }
589
590 #[test]
591 fn test_with_gzip_level_overrides_env() {
592 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "3");
594
595 let exporter = OtlpStdoutSpanExporter::with_gzip_level(8);
597 assert_eq!(exporter.gzip_level, 8);
598
599 std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
601 }
602
603 #[tokio::test]
604 async fn test_compression_level_affects_output_size() {
605 let mut spans = Vec::new();
607 for i in 0..100 {
608 let mut span = create_test_span();
609 span.attributes.push(KeyValue::new("index", i));
611 span.attributes.push(KeyValue::new("data", "a".repeat(100)));
612 spans.push(span);
613 }
614
615 let (mut no_compression_exporter, no_compression_output) =
617 OtlpStdoutSpanExporter::with_test_output();
618 no_compression_exporter.gzip_level = 0;
619 let _ = no_compression_exporter.export(spans.clone()).await;
620 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
621
622 let (mut medium_compression_exporter, medium_compression_output) =
624 OtlpStdoutSpanExporter::with_test_output();
625 medium_compression_exporter.gzip_level = 5;
626 let _ = medium_compression_exporter.export(spans.clone()).await;
627 let medium_compression_size =
628 extract_payload_size(&medium_compression_output.get_output()[0]);
629
630 let (mut max_compression_exporter, max_compression_output) =
632 OtlpStdoutSpanExporter::with_test_output();
633 max_compression_exporter.gzip_level = 9;
634 let _ = max_compression_exporter.export(spans.clone()).await;
635 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
636
637 assert!(no_compression_size > medium_compression_size,
639 "Medium compression (level 5) should produce smaller output than no compression (level 0). Got {} vs {}",
640 medium_compression_size, no_compression_size);
641
642 assert!(medium_compression_size >= max_compression_size,
643 "Maximum compression (level 9) should produce output no larger than medium compression (level 5). Got {} vs {}",
644 max_compression_size, medium_compression_size);
645
646 let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
648 let medium_compression_spans =
649 decode_and_count_spans(&medium_compression_output.get_output()[0]);
650 let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
651
652 assert_eq!(
653 no_compression_spans,
654 spans.len(),
655 "No compression output should contain all spans"
656 );
657 assert_eq!(
658 medium_compression_spans,
659 spans.len(),
660 "Medium compression output should contain all spans"
661 );
662 assert_eq!(
663 max_compression_spans,
664 spans.len(),
665 "Maximum compression output should contain all spans"
666 );
667 }
668
669 fn extract_payload_size(json_str: &str) -> usize {
671 let json: Value = serde_json::from_str(json_str).unwrap();
672 let payload = json["payload"].as_str().unwrap();
673 base64_engine.decode(payload).unwrap().len()
674 }
675
676 fn decode_and_count_spans(json_str: &str) -> usize {
678 let json: Value = serde_json::from_str(json_str).unwrap();
679 let payload = json["payload"].as_str().unwrap();
680 let decoded = base64_engine.decode(payload).unwrap();
681
682 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
683 let mut decompressed = Vec::new();
684 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
685
686 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
687
688 let mut span_count = 0;
690 for resource_span in &request.resource_spans {
691 for scope_span in &resource_span.scope_spans {
692 span_count += scope_span.spans.len();
693 }
694 }
695
696 span_count
697 }
698
699 #[tokio::test]
700 async fn test_export_single_span() {
701 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
702 let span = create_test_span();
703
704 let result = exporter.export(vec![span]).await;
705 assert!(result.is_ok());
706
707 let output = output.get_output();
708 assert_eq!(output.len(), 1);
709
710 let json: Value = serde_json::from_str(&output[0]).unwrap();
712 assert_eq!(json["__otel_otlp_stdout"], VERSION);
713 assert_eq!(json["method"], "POST");
714 assert_eq!(json["content-type"], "application/x-protobuf");
715 assert_eq!(json["content-encoding"], "gzip");
716 assert_eq!(json["base64"], true);
717
718 let payload = json["payload"].as_str().unwrap();
720 let decoded = base64_engine.decode(payload).unwrap();
721
722 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
724 let mut decompressed = Vec::new();
725 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
726
727 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
729 assert_eq!(request.resource_spans.len(), 1);
730 }
731
732 #[tokio::test]
733 async fn test_export_empty_batch() {
734 let mut exporter = OtlpStdoutSpanExporter::new();
735 let result = exporter.export(vec![]).await;
736 assert!(result.is_ok());
737 }
738
739 #[test]
740 fn test_gzip_level_configuration() {
741 let exporter = OtlpStdoutSpanExporter::with_gzip_level(9);
742 assert_eq!(exporter.gzip_level, 9);
743 }
744
745 #[tokio::test]
746 async fn test_env_var_affects_export_compression() {
747 let span = create_test_span();
749 let spans = vec![span];
750
751 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "0");
753 let (mut env_exporter_0, env_output_0) = OtlpStdoutSpanExporter::with_test_output();
754 let _ = env_exporter_0.export(spans.clone()).await;
755 let env_size_0 = extract_payload_size(&env_output_0.get_output()[0]);
756
757 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "9");
759 let (mut env_exporter_9, env_output_9) = OtlpStdoutSpanExporter::with_test_output();
760 let _ = env_exporter_9.export(spans.clone()).await;
761 let env_size_9 = extract_payload_size(&env_output_9.get_output()[0]);
762
763 assert!(env_size_0 > env_size_9,
765 "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
766 env_size_9, env_size_0);
767
768 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "invalid");
770 let (mut env_exporter_invalid, _env_output_invalid) =
771 OtlpStdoutSpanExporter::with_test_output();
772 let _ = env_exporter_invalid.export(spans.clone()).await;
773
774 std::env::set_var(COMPRESSION_LEVEL_ENV_VAR, "0");
776 let (mut explicit_exporter, explicit_output) = OtlpStdoutSpanExporter::with_test_output();
777 explicit_exporter.gzip_level = 9;
778 let _ = explicit_exporter.export(spans.clone()).await;
779 let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
780
781 assert!(env_size_0 > explicit_size,
783 "Explicit level 9 should produce smaller output than environment variable level 0. Got {} vs {}",
784 explicit_size, env_size_0);
785
786 std::env::remove_var(COMPRESSION_LEVEL_ENV_VAR);
788 }
789}