1use async_trait::async_trait;
111use base64::{engine::general_purpose::STANDARD as base64_engine, Engine};
112use bon::bon;
113use flate2::{write::GzEncoder, Compression};
114use futures_util::future::BoxFuture;
115use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
116use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
117use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
118use opentelemetry_sdk::resource::Resource;
119use opentelemetry_sdk::{
120 error::OTelSdkError,
121 trace::{SpanData, SpanExporter},
122};
123use prost::Message;
124use serde::Serialize;
125
126mod constants;
127use constants::{defaults, env_vars};
128
129pub mod consts {
131 pub use crate::constants::defaults;
137 pub use crate::constants::env_vars;
138 pub use crate::constants::resource_attributes;
139}
140
141const VERSION: &str = env!("CARGO_PKG_VERSION");
142
143trait Output: Send + Sync + std::fmt::Debug {
148 fn write_line(&self, line: &str) -> Result<(), OTelSdkError>;
158}
159
160#[derive(Debug, Default)]
162struct StdOutput;
163
164impl Output for StdOutput {
165 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
166 let stdout = io::stdout();
168 let mut handle = stdout.lock();
169
170 writeln!(handle, "{}", line).map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
172
173 Ok(())
174 }
175}
176
177#[derive(Debug, Serialize)]
182struct ExporterOutput<'a> {
183 #[serde(rename = "__otel_otlp_stdout")]
185 version: &'a str,
186 source: String,
188 endpoint: &'a str,
190 method: &'a str,
192 #[serde(rename = "content-type")]
194 content_type: &'a str,
195 #[serde(rename = "content-encoding")]
197 content_encoding: &'a str,
198 #[serde(skip_serializing_if = "HashMap::is_empty")]
200 headers: HashMap<String, String>,
201 payload: String,
203 base64: bool,
205}
206
207#[derive(Debug)]
231pub struct OtlpStdoutSpanExporter {
232 compression_level: u8,
234 output: Arc<dyn Output>,
236 resource: Option<Resource>,
238}
239
240impl Default for OtlpStdoutSpanExporter {
241 fn default() -> Self {
242 Self::builder().build()
243 }
244}
245#[bon]
246impl OtlpStdoutSpanExporter {
247 #[builder]
266 pub fn new(
267 compression_level: Option<u8>,
268 output: Option<Arc<dyn Output>>,
269 resource: Option<Resource>,
270 ) -> Self {
271 let compression_level = match env::var(env_vars::COMPRESSION_LEVEL) {
273 Ok(value) => match value.parse::<u8>() {
274 Ok(level) if level <= 9 => level,
275 Ok(level) => {
276 log::warn!(
277 "Invalid value in {}: {} (must be 0-9), using fallback",
278 env_vars::COMPRESSION_LEVEL,
279 level
280 );
281 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
282 }
283 Err(_) => {
284 log::warn!(
285 "Failed to parse {}: {}, using fallback",
286 env_vars::COMPRESSION_LEVEL,
287 value
288 );
289 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
290 }
291 },
292 Err(_) => {
293 compression_level.unwrap_or(defaults::COMPRESSION_LEVEL)
295 }
296 };
297
298 Self {
299 compression_level,
300 resource,
301 output: output.unwrap_or(Arc::new(StdOutput)),
302 }
303 }
304
305 fn get_service_name() -> String {
313 env::var(env_vars::SERVICE_NAME)
314 .or_else(|_| env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME))
315 .unwrap_or_else(|_| defaults::SERVICE_NAME.to_string())
316 }
317
318 #[cfg(test)]
319 fn with_test_output() -> (Self, Arc<TestOutput>) {
320 let output = Arc::new(TestOutput::new());
321
322 let exporter = Self::builder().output(output.clone()).build();
324
325 (exporter, output)
326 }
327
328 fn parse_headers() -> HashMap<String, String> {
333 let mut headers = HashMap::new();
334
335 if let Ok(global_headers) = env::var("OTEL_EXPORTER_OTLP_HEADERS") {
337 Self::parse_header_string(&global_headers, &mut headers);
338 }
339
340 if let Ok(trace_headers) = env::var("OTEL_EXPORTER_OTLP_TRACES_HEADERS") {
342 Self::parse_header_string(&trace_headers, &mut headers);
343 }
344
345 headers
346 }
347
348 fn parse_header_string(header_str: &str, headers: &mut HashMap<String, String>) {
355 for pair in header_str.split(',') {
356 if let Some((key, value)) = pair.split_once('=') {
357 let key = key.trim().to_lowercase();
358 if key != "content-type" && key != "content-encoding" {
360 headers.insert(key, value.trim().to_string());
361 }
362 }
363 }
364 }
365}
366
367#[async_trait]
368impl SpanExporter for OtlpStdoutSpanExporter {
369 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, Result<(), OTelSdkError>> {
386 let result = (|| {
388 let resource = self
390 .resource
391 .clone()
392 .unwrap_or_else(|| opentelemetry_sdk::Resource::builder_empty().build());
393 let resource_attrs = ResourceAttributesWithSchema::from(&resource);
394 let resource_spans = group_spans_by_resource_and_scope(batch, &resource_attrs);
395 let request = ExportTraceServiceRequest { resource_spans };
396
397 let proto_bytes = request.encode_to_vec();
399
400 let mut encoder =
402 GzEncoder::new(Vec::new(), Compression::new(self.compression_level as u32));
403 encoder
404 .write_all(&proto_bytes)
405 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
406 let compressed_bytes = encoder
407 .finish()
408 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
409
410 let payload = base64_engine.encode(compressed_bytes);
412
413 let output_data = ExporterOutput {
415 version: VERSION,
416 source: Self::get_service_name(),
417 endpoint: defaults::ENDPOINT,
418 method: "POST",
419 content_type: "application/x-protobuf",
420 content_encoding: "gzip",
421 headers: Self::parse_headers(),
422 payload,
423 base64: true,
424 };
425
426 self.output.write_line(
428 &serde_json::to_string(&output_data)
429 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?,
430 )?;
431
432 Ok(())
433 })();
434
435 Box::pin(std::future::ready(result))
437 }
438
439 fn shutdown(&mut self) -> Result<(), OTelSdkError> {
447 Ok(())
448 }
449
450 fn force_flush(&mut self) -> Result<(), OTelSdkError> {
458 Ok(())
459 }
460
461 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
471 self.resource = Some(<opentelemetry_sdk::Resource as Into<Resource>>::into(
472 resource.clone(),
473 ));
474 }
475}
476
477#[cfg(doctest)]
478#[macro_use]
479extern crate doc_comment;
480
481#[cfg(doctest)]
482use doc_comment::doctest;
483
484#[cfg(doctest)]
485doctest!("../README.md", readme);
486
487#[cfg(test)]
488use std::sync::Mutex;
489use std::{
490 collections::HashMap,
491 env,
492 io::{self, Write},
493 result::Result,
494 sync::Arc,
495};
496
497#[cfg(test)]
499#[derive(Debug, Default)]
500struct TestOutput {
501 buffer: Arc<Mutex<Vec<String>>>,
502}
503
504#[cfg(test)]
505impl TestOutput {
506 fn new() -> Self {
507 Self {
508 buffer: Arc::new(Mutex::new(Vec::new())),
509 }
510 }
511
512 fn get_output(&self) -> Vec<String> {
513 self.buffer.lock().unwrap().clone()
514 }
515}
516
517#[cfg(test)]
518impl Output for TestOutput {
519 fn write_line(&self, line: &str) -> Result<(), OTelSdkError> {
520 self.buffer.lock().unwrap().push(line.to_string());
521 Ok(())
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use opentelemetry::{
529 trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
530 InstrumentationScope, KeyValue,
531 };
532 use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks};
533 use serde_json::Value;
534 use serial_test::serial;
535 use std::time::SystemTime;
536
537 fn create_test_span() -> SpanData {
538 let trace_id_bytes = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42];
539 let span_id_bytes = [0, 0, 0, 0, 0, 0, 0, 123];
540 let parent_id_bytes = [0, 0, 0, 0, 0, 0, 0, 42];
541
542 let span_context = SpanContext::new(
543 TraceId::from_bytes(trace_id_bytes),
544 SpanId::from_bytes(span_id_bytes),
545 TraceFlags::default(),
546 false,
547 TraceState::default(),
548 );
549
550 SpanData {
551 span_context,
552 parent_span_id: SpanId::from_bytes(parent_id_bytes),
553 span_kind: SpanKind::Client,
554 name: "test-span".into(),
555 start_time: SystemTime::UNIX_EPOCH,
556 end_time: SystemTime::UNIX_EPOCH,
557 attributes: vec![KeyValue::new("test.key", "test-value")],
558 dropped_attributes_count: 0,
559 events: SpanEvents::default(),
560 links: SpanLinks::default(),
561 status: Status::Ok,
562 instrumentation_scope: InstrumentationScope::builder("test-library")
563 .with_version("1.0.0")
564 .with_schema_url("https://opentelemetry.io/schema/1.0.0")
565 .build(),
566 }
567 }
568
569 #[test]
570 fn test_parse_headers() {
571 std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", "key1=value1,key2=value2");
572 std::env::set_var(
573 "OTEL_EXPORTER_OTLP_TRACES_HEADERS",
574 "key2=override,key3=value3",
575 );
576
577 let headers = OtlpStdoutSpanExporter::parse_headers();
578
579 assert_eq!(headers.get("key1").unwrap(), "value1");
580 assert_eq!(headers.get("key2").unwrap(), "override");
581 assert_eq!(headers.get("key3").unwrap(), "value3");
582
583 std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
585 std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_HEADERS");
586 }
587
588 #[test]
589 fn test_service_name_resolution() {
590 std::env::set_var(env_vars::SERVICE_NAME, "otel-service");
592 std::env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "lambda-function");
593 assert_eq!(OtlpStdoutSpanExporter::get_service_name(), "otel-service");
594
595 std::env::remove_var(env_vars::SERVICE_NAME);
597 assert_eq!(
598 OtlpStdoutSpanExporter::get_service_name(),
599 "lambda-function"
600 );
601
602 std::env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
604 assert_eq!(
605 OtlpStdoutSpanExporter::get_service_name(),
606 defaults::SERVICE_NAME
607 );
608 }
609
610 #[test]
611 fn test_compression_level_precedence() {
612 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
614 let exporter = OtlpStdoutSpanExporter::builder()
615 .compression_level(7)
616 .build();
617 assert_eq!(exporter.compression_level, 3);
618
619 std::env::set_var(env_vars::COMPRESSION_LEVEL, "invalid");
621 let exporter = OtlpStdoutSpanExporter::builder()
622 .compression_level(7)
623 .build();
624 assert_eq!(exporter.compression_level, 7);
625
626 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
628 let exporter = OtlpStdoutSpanExporter::builder()
629 .compression_level(7)
630 .build();
631 assert_eq!(exporter.compression_level, 7);
632
633 let exporter = OtlpStdoutSpanExporter::builder()
635 .compression_level(defaults::COMPRESSION_LEVEL)
636 .build();
637 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
638 }
639
640 #[test]
641 fn test_new_uses_env_compression_level() {
642 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
644 let exporter = OtlpStdoutSpanExporter::default();
645 assert_eq!(exporter.compression_level, 3);
646
647 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
649 let exporter = OtlpStdoutSpanExporter::default();
650 assert_eq!(exporter.compression_level, defaults::COMPRESSION_LEVEL);
651 }
652
653 #[tokio::test]
654 #[serial]
655 async fn test_compression_level_affects_output_size() {
656 let mut spans = Vec::new();
658 for i in 0..100 {
659 let mut span = create_test_span();
660 span.attributes.push(KeyValue::new("index", i));
662 span.attributes
664 .push(KeyValue::new("data", "a".repeat(1000)));
665 spans.push(span);
666 }
667
668 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
670
671 let no_compression_output = Arc::new(TestOutput::new());
673 let mut no_compression_exporter = OtlpStdoutSpanExporter {
674 compression_level: 0,
675 resource: None,
676 output: no_compression_output.clone() as Arc<dyn Output>,
677 };
678 let _ = no_compression_exporter.export(spans.clone()).await;
679 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
680
681 let max_compression_output = Arc::new(TestOutput::new());
683 let mut max_compression_exporter = OtlpStdoutSpanExporter {
684 compression_level: 9,
685 resource: None,
686 output: max_compression_output.clone() as Arc<dyn Output>,
687 };
688 let _ = max_compression_exporter.export(spans.clone()).await;
689 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
690
691 assert!(no_compression_size > max_compression_size,
693 "Maximum compression (level 9) should produce output no larger than no compression (level 0). Got {} vs {}",
694 max_compression_size, no_compression_size);
695
696 let no_compression_spans = decode_and_count_spans(&no_compression_output.get_output()[0]);
698 let max_compression_spans = decode_and_count_spans(&max_compression_output.get_output()[0]);
699
700 assert_eq!(
701 no_compression_spans,
702 spans.len(),
703 "No compression output should contain all spans"
704 );
705 assert_eq!(
706 max_compression_spans,
707 spans.len(),
708 "Maximum compression output should contain all spans"
709 );
710 }
711
712 fn extract_payload_size(json_str: &str) -> usize {
714 let json: Value = serde_json::from_str(json_str).unwrap();
715 let payload = json["payload"].as_str().unwrap();
716 base64_engine.decode(payload).unwrap().len()
717 }
718
719 fn decode_and_count_spans(json_str: &str) -> usize {
721 let json: Value = serde_json::from_str(json_str).unwrap();
722 let payload = json["payload"].as_str().unwrap();
723 let decoded = base64_engine.decode(payload).unwrap();
724
725 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
726 let mut decompressed = Vec::new();
727 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
728
729 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
730
731 let mut span_count = 0;
733 for resource_span in &request.resource_spans {
734 for scope_span in &resource_span.scope_spans {
735 span_count += scope_span.spans.len();
736 }
737 }
738
739 span_count
740 }
741
742 #[tokio::test]
743 async fn test_export_single_span() {
744 let (mut exporter, output) = OtlpStdoutSpanExporter::with_test_output();
745 let span = create_test_span();
746
747 let result = exporter.export(vec![span]).await;
748 assert!(result.is_ok());
749
750 let output = output.get_output();
751 assert_eq!(output.len(), 1);
752
753 let json: Value = serde_json::from_str(&output[0]).unwrap();
755 assert_eq!(json["__otel_otlp_stdout"], VERSION);
756 assert_eq!(json["method"], "POST");
757 assert_eq!(json["content-type"], "application/x-protobuf");
758 assert_eq!(json["content-encoding"], "gzip");
759 assert_eq!(json["base64"], true);
760
761 let payload = json["payload"].as_str().unwrap();
763 let decoded = base64_engine.decode(payload).unwrap();
764
765 let mut decoder = flate2::read::GzDecoder::new(&decoded[..]);
767 let mut decompressed = Vec::new();
768 std::io::Read::read_to_end(&mut decoder, &mut decompressed).unwrap();
769
770 let request = ExportTraceServiceRequest::decode(&*decompressed).unwrap();
772 assert_eq!(request.resource_spans.len(), 1);
773 }
774
775 #[tokio::test]
776 async fn test_export_empty_batch() {
777 let mut exporter = OtlpStdoutSpanExporter::default();
778 let result = exporter.export(vec![]).await;
779 assert!(result.is_ok());
780 }
781
782 #[test]
783 #[serial]
784 fn test_gzip_level_configuration() {
785 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
787
788 let exporter = OtlpStdoutSpanExporter::builder()
790 .compression_level(9)
791 .build();
792 assert_eq!(exporter.compression_level, 9);
793 }
794
795 #[tokio::test]
796 #[serial]
797 async fn test_env_var_affects_export_compression() {
798 let span = create_test_span();
800 let mut spans = Vec::new();
801 for i in 0..100 {
803 let mut span = span.clone();
804 span.attributes
806 .push(KeyValue::new(format!("test-key-{}", i), "a".repeat(1000)));
807 spans.push(span);
808 }
809
810 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
812 let no_compression_output = Arc::new(TestOutput::new());
813 let mut no_compression_exporter = OtlpStdoutSpanExporter::builder()
814 .compression_level(0)
815 .build();
816 no_compression_exporter.output = no_compression_output.clone() as Arc<dyn Output>;
817 let _ = no_compression_exporter.export(spans.clone()).await;
818 let no_compression_size = extract_payload_size(&no_compression_output.get_output()[0]);
819
820 std::env::set_var(env_vars::COMPRESSION_LEVEL, "9");
822 let max_compression_output = Arc::new(TestOutput::new());
823 let mut max_compression_exporter = OtlpStdoutSpanExporter::builder()
824 .compression_level(9)
825 .build();
826 max_compression_exporter.output = max_compression_output.clone() as Arc<dyn Output>;
827 let _ = max_compression_exporter.export(spans.clone()).await;
828 let max_compression_size = extract_payload_size(&max_compression_output.get_output()[0]);
829
830 assert!(no_compression_size > max_compression_size,
832 "Environment variable COMPRESSION_LEVEL=9 should produce smaller output than COMPRESSION_LEVEL=0. Got {} vs {}",
833 max_compression_size, no_compression_size);
834
835 std::env::set_var(env_vars::COMPRESSION_LEVEL, "0");
837 let explicit_output = Arc::new(TestOutput::new());
838
839 let mut explicit_exporter = OtlpStdoutSpanExporter::default();
841 explicit_exporter.output = explicit_output.clone() as Arc<dyn Output>;
842
843 let _ = explicit_exporter.export(spans.clone()).await;
845 let explicit_size = extract_payload_size(&explicit_output.get_output()[0]);
846
847 assert!(explicit_size > max_compression_size,
850 "Environment variable should take precedence over explicitly set level. Expected size closer to {} but got {}",
851 no_compression_size, explicit_size);
852
853 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
855 }
856
857 #[tokio::test]
858 #[serial]
859 async fn test_environment_variable_precedence() {
860 std::env::set_var(env_vars::COMPRESSION_LEVEL, "3");
862
863 let exporter = OtlpStdoutSpanExporter::builder()
866 .compression_level(9)
867 .build();
868 assert_eq!(exporter.compression_level, 3);
869
870 std::env::remove_var(env_vars::COMPRESSION_LEVEL);
872 let exporter = OtlpStdoutSpanExporter::builder()
873 .compression_level(9)
874 .build();
875 assert_eq!(exporter.compression_level, 9);
876 }
877}