use anyhow::{Context, Result};
use base64::{engine::general_purpose, Engine};
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use otlp_stdout_span_exporter::ExporterOutput;
use prost::Message;
use serde_json::Value;
use std::io::{Read, Write};
use tracing;
#[derive(Clone, Debug)]
pub struct TelemetryData {
pub source: String,
pub endpoint: String,
pub payload: Vec<u8>,
pub content_type: String,
pub content_encoding: Option<String>,
}
impl Default for TelemetryData {
fn default() -> Self {
Self {
source: "unknown".to_string(),
endpoint: "http://localhost:4318/v1/traces".to_string(),
payload: Vec::new(),
content_type: "application/x-protobuf".to_string(),
content_encoding: None, }
}
}
impl TelemetryData {
fn convert_to_protobuf(
payload: Vec<u8>,
content_type: &str,
content_encoding: Option<&str>,
) -> Result<Vec<u8>> {
tracing::debug!(
"Converting payload from {}/{:?} to protobuf",
content_type,
content_encoding
);
let decompressed = if content_encoding == Some("gzip") {
tracing::debug!("Decompressing gzipped payload");
let mut decoder = GzDecoder::new(&payload[..]);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.context("Failed to decompress payload")?;
decompressed
} else {
payload
};
match content_type {
"application/x-protobuf" => {
tracing::debug!("Payload already in protobuf format");
Ok(decompressed)
}
"application/json" => {
tracing::debug!("Converting JSON to protobuf");
Self::convert_json_to_protobuf(&decompressed)
}
_ => {
tracing::warn!("Unknown content type: {}, keeping as is", content_type);
Ok(decompressed)
}
}
}
fn convert_json_to_protobuf(json_bytes: &[u8]) -> Result<Vec<u8>> {
let request: ExportTraceServiceRequest = serde_json::from_slice(json_bytes)
.context("Failed to parse JSON as ExportTraceServiceRequest")?;
let protobuf_bytes = request.encode_to_vec();
tracing::debug!(
"Successfully converted JSON to protobuf (size: {} bytes)",
protobuf_bytes.len()
);
Ok(protobuf_bytes)
}
pub fn compress(&mut self, compression_level: u32) -> Result<()> {
if self.content_encoding != Some("gzip".to_string()) {
tracing::debug!("Compressing payload with level {}", compression_level);
let original_size = self.payload.len();
let mut encoder = GzEncoder::new(Vec::new(), Compression::new(compression_level));
encoder
.write_all(&self.payload)
.context("Failed to compress payload")?;
self.payload = encoder.finish().context("Failed to finish compression")?;
self.content_encoding = Some("gzip".to_string());
tracing::debug!(
"Compressed payload from {} to {} bytes",
original_size,
self.payload.len()
);
}
Ok(())
}
pub fn from_log_record(record: ExporterOutput) -> Result<Self> {
let raw_payload = if record.base64 {
general_purpose::STANDARD
.decode(&record.payload)
.context("Failed to decode base64 payload")?
} else {
record.payload.as_bytes().to_vec()
};
let protobuf_payload = Self::convert_to_protobuf(
raw_payload,
&record.content_type,
Some(&record.content_encoding),
)?;
Ok(Self {
source: record.source.clone(),
endpoint: record.endpoint.to_string(),
payload: protobuf_payload,
content_type: "application/x-protobuf".to_string(),
content_encoding: None, })
}
pub fn from_raw_span(span: Value, log_group: &str) -> Result<Self> {
let json_string =
serde_json::to_string(&span).context("Failed to serialize span data to JSON string")?;
let raw_payload = json_string.as_bytes().to_vec();
let protobuf_payload = Self::convert_to_protobuf(raw_payload, "application/json", None)?;
Ok(Self {
source: log_group.to_string(),
endpoint: "http://localhost:4318/v1/traces".to_string(),
payload: protobuf_payload,
content_type: "application/x-protobuf".to_string(),
content_encoding: None, })
}
}
#[cfg(test)]
mod tests {
use super::*;
use base64::{engine::general_purpose, Engine};
use flate2::{write::GzEncoder, Compression};
use otlp_stdout_span_exporter::ExporterOutput;
use serde_json::json;
use std::collections::HashMap;
use std::io::Write;
fn create_test_payload() -> String {
let request = ExportTraceServiceRequest {
resource_spans: vec![],
};
let proto_bytes = request.encode_to_vec();
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&proto_bytes).unwrap();
let compressed_bytes = encoder.finish().unwrap();
general_purpose::STANDARD.encode(compressed_bytes)
}
#[test]
fn test_from_log_record() {
let record = ExporterOutput {
version: "test".to_string(),
source: "test-service".to_string(),
endpoint: "http://example.com".to_string(),
method: "POST".to_string(),
payload: create_test_payload(),
headers: Some(HashMap::new()),
content_type: "application/x-protobuf".to_string(),
content_encoding: "gzip".to_string(),
base64: true,
level: Some("info".to_string()),
};
let telemetry = TelemetryData::from_log_record(record).unwrap();
assert_eq!(telemetry.source, "test-service");
assert_eq!(telemetry.endpoint, "http://example.com");
assert_eq!(telemetry.content_type, "application/x-protobuf");
assert_eq!(telemetry.content_encoding, None);
}
#[test]
fn test_from_raw_span() {
let span = json!({
"resourceSpans": []
});
let telemetry = TelemetryData::from_raw_span(span, "aws/spans").unwrap();
assert_eq!(telemetry.source, "aws/spans");
assert_eq!(telemetry.content_type, "application/x-protobuf");
assert_eq!(telemetry.content_encoding, None); }
#[test]
fn test_compress() {
let mut telemetry = TelemetryData {
source: "test".to_string(),
endpoint: "http://example.com".to_string(),
payload: vec![1, 2, 3, 4, 5],
content_type: "application/x-protobuf".to_string(),
content_encoding: None,
};
telemetry.compress(6).unwrap();
assert_eq!(telemetry.content_encoding, Some("gzip".to_string()));
let mut decoder = GzDecoder::new(&telemetry.payload[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).unwrap();
assert_eq!(decompressed, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_convert_to_protobuf_already_protobuf() {
let original_payload = vec![1, 2, 3, 4];
let converted = TelemetryData::convert_to_protobuf(
original_payload.clone(),
"application/x-protobuf",
None,
)
.unwrap();
assert_eq!(converted, original_payload);
}
#[test]
fn test_convert_to_protobuf_from_json() {
let json_data = json!({
"resourceSpans": []
});
let json_bytes = serde_json::to_vec(&json_data).unwrap();
let converted =
TelemetryData::convert_to_protobuf(json_bytes, "application/json", None).unwrap();
let request = ExportTraceServiceRequest::decode(converted.as_slice()).unwrap();
assert_eq!(request.resource_spans.len(), 0);
}
#[test]
fn test_convert_to_protobuf_from_gzipped_json() {
let json_data = json!({
"resourceSpans": []
});
let json_bytes = serde_json::to_vec(&json_data).unwrap();
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&json_bytes).unwrap();
let compressed = encoder.finish().unwrap();
let converted =
TelemetryData::convert_to_protobuf(compressed, "application/json", Some("gzip"))
.unwrap();
let request = ExportTraceServiceRequest::decode(converted.as_slice()).unwrap();
assert_eq!(request.resource_spans.len(), 0);
}
}