use super::*;
use base64::engine::general_purpose;
use flate2::read::GzDecoder;
use futures::FutureExt;
use opentelemetry::trace::Tracer;
use opentelemetry_otlp::Protocol;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithHttpConfig;
use sealed_test::prelude::*;
use serde_json::Value;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
io::{Cursor, Read},
sync::Arc,
time::Duration,
};
use tokio::io::AsyncWrite;
use tokio::sync::Mutex;
#[derive(Clone)]
struct TestWriter {
buffer: Arc<Mutex<Cursor<Vec<u8>>>>,
}
impl TestWriter {
fn new() -> Self {
TestWriter {
buffer: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
}
}
async fn get_content(&self) -> String {
let mut buffer = self.buffer.lock().await;
buffer.set_position(0);
let mut content = String::new();
buffer
.read_to_string(&mut content)
.expect("Failed to read buffer");
content
}
}
impl AsyncWrite for TestWriter {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let lock_future = self.buffer.lock();
futures::pin_mut!(lock_future);
let mut guard = futures::ready!(lock_future.poll_unpin(cx));
Poll::Ready(std::io::Write::write(&mut *guard, buf))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let lock_future = self.buffer.lock();
futures::pin_mut!(lock_future);
let mut guard = futures::ready!(lock_future.poll_unpin(cx));
Poll::Ready(std::io::Write::flush(&mut *guard))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
async fn run_tracer_test() -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let protocol = match std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL")
.unwrap_or_default()
.to_lowercase()
.as_str()
{
"http/protobuf" => Protocol::HttpBinary,
"http/json" | "" => Protocol::HttpJson,
unsupported => {
eprintln!(
"Warning: OTEL_EXPORTER_OTLP_PROTOCOL value '{}' is not supported. Defaulting to HTTP JSON.",
unsupported
);
Protocol::HttpJson
}
};
let test_writer = TestWriter::new();
let client = StdoutClient::new_with_writer(test_writer.clone());
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_protocol(protocol)
.with_http_client(client)
.build()?;
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_simple_exporter(exporter)
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let tracer = opentelemetry::global::tracer("my_tracer");
tracer.in_span("example_span", |_cx| {
std::thread::sleep(Duration::from_millis(100));
});
let _ = tracer_provider.shutdown();
tokio::time::sleep(Duration::from_millis(100)).await;
let content = test_writer.get_content().await;
Ok(content)
}
fn verify_output(
captured_output: String,
expected_content_type: &str,
expected_content_encoding: Option<&str>,
) {
if captured_output.is_empty() {
panic!("Captured output is empty");
}
let json_output: Value =
serde_json::from_str(&captured_output).expect("Failed to parse JSON output");
assert_eq!(
json_output["content-type"], expected_content_type,
"Content-Type mismatch"
);
assert_eq!(json_output["method"], "POST", "HTTP method mismatch");
assert!(
json_output["__otel_otlp_stdout"]
.as_str()
.unwrap()
.contains("otlp-stdout-client@"),
"OTEL version format mismatch"
);
assert_eq!(
json_output["endpoint"].as_str().unwrap(),
"http://example.com/v1/traces",
"Endpoint mismatch"
);
assert!(json_output.get("payload").is_some(), "Payload is missing");
match expected_content_encoding {
Some(encoding) => {
assert_eq!(
json_output["content-encoding"].as_str(),
Some(encoding),
"Content-Encoding mismatch"
);
}
None => {
assert!(
json_output.get("content-encoding").is_none(),
"Content-Encoding should not be present"
);
}
}
match expected_content_type {
"application/json" => {
if expected_content_encoding == Some("gzip") {
assert!(
json_output["payload"].is_string(),
"Compressed JSON payload should be a string"
);
assert!(
json_output["base64"].as_bool().unwrap_or(false),
"base64 flag should be true for compressed JSON"
);
let decoded = general_purpose::STANDARD
.decode(json_output["payload"].as_str().unwrap())
.expect("Failed to decode base64");
let mut decoder = GzDecoder::new(&decoded[..]);
let mut decompressed = String::new();
decoder
.read_to_string(&mut decompressed)
.expect("Failed to decompress");
let decompressed_json: Value =
serde_json::from_str(&decompressed).expect("Failed to parse decompressed JSON");
assert!(
decompressed_json.is_object(),
"Decompressed JSON payload should be an object"
);
} else {
assert!(
json_output["payload"].is_object(),
"Uncompressed JSON payload should be an object"
);
}
}
"application/x-protobuf" => {
assert!(
json_output["payload"].is_string(),
"Protobuf payload should be a string"
);
assert!(
json_output["base64"].as_bool().unwrap_or(false),
"base64 flag should be true for Protobuf"
);
}
_ => panic!("Unexpected content type"),
}
assert!(
json_output["headers"].get("content-type").is_some(),
"Content-Type header is missing"
);
}
#[tokio::test]
#[sealed_test(env = [
("OTEL_EXPORTER_OTLP_ENDPOINT", "http://example.com/"),
("OTEL_EXPORTER_OTLP_PROTOCOL", "http/json"),
])]
async fn test_stdout_client_send_json_plain() {
let captured_output = run_tracer_test().await.unwrap();
verify_output(captured_output, "application/json", None);
}
#[tokio::test]
#[sealed_test(env = [
("OTEL_EXPORTER_OTLP_COMPRESSION", "gzip"),
("OTEL_EXPORTER_OTLP_ENDPOINT", "http://example.com/"),
("OTEL_EXPORTER_OTLP_PROTOCOL", "http/json"),
])]
async fn test_stdout_client_send_json_gzip() {
let captured_output = run_tracer_test().await.unwrap();
verify_output(captured_output, "application/json", Some("gzip"));
}
#[tokio::test]
#[sealed_test(env = [
("OTEL_EXPORTER_OTLP_ENDPOINT", "http://example.com/"),
("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf"),
])]
async fn test_stdout_client_send_proto_plain() {
let captured_output = run_tracer_test().await.unwrap();
verify_output(captured_output, "application/x-protobuf", None);
}
#[tokio::test]
#[sealed_test(env = [
("OTEL_EXPORTER_OTLP_COMPRESSION", "gzip"),
("OTEL_EXPORTER_OTLP_ENDPOINT", "http://example.com/"),
("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf"),
])]
async fn test_stdout_client_send_proto_gzip() {
let captured_output = run_tracer_test().await.unwrap();
verify_output(captured_output, "application/x-protobuf", Some("gzip"));
}