use async_trait::async_trait;
use base64::{engine::general_purpose, Engine as Base64Engine};
use bytes::Bytes;
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use http::{Request, Response};
use opentelemetry_http::HttpClient;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::{env, error::Error as StdError, fmt::Debug, io::Read};
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
pub const CONTENT_TYPE_JSON: &str = "application/json";
pub const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf";
pub const CONTENT_TYPE_HEADER: &str = "content-type";
pub const CONTENT_ENCODING_HEADER: &str = "content-encoding";
pub const KEY_SOURCE: &str = "source";
pub const KEY_ENDPOINT: &str = "endpoint";
pub const KEY_METHOD: &str = "method";
pub const KEY_PAYLOAD: &str = "payload";
pub const KEY_BASE64: &str = "base64";
pub const KEY_HEADERS: &str = "headers";
pub const OTEL_VERSION_PREFIX: &str = "otlp-stdout-";
pub const ENCODING_GZIP: &str = "gzip";
#[derive(Debug, Serialize, Deserialize)]
pub struct LogRecord {
#[serde(rename = "__otel_otlp_stdout")]
pub _otel: String,
pub source: String,
pub endpoint: String,
pub method: String,
pub payload: Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub headers: Option<HashMap<String, String>>,
#[serde(rename = "content-type")]
pub content_type: String,
#[serde(rename = "content-encoding", skip_serializing_if = "Option::is_none")]
pub content_encoding: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub base64: Option<bool>,
}
pub struct StdoutClient {
service_name: String,
content_encoding_gzip: Option<String>,
writer: Arc<Mutex<Box<dyn AsyncWrite + Send + Sync + Unpin>>>,
version_identifier: String,
}
impl Debug for StdoutClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StdoutClient")
.field("content_encoding_gzip", &self.content_encoding_gzip)
.field("writer", &"Box<dyn Write + Send + Sync>")
.field("service_name", &self.service_name)
.finish()
}
}
impl StdoutClient {
pub fn new() -> Self {
StdoutClient {
content_encoding_gzip: Self::parse_compression(),
writer: Arc::new(Mutex::new(Box::new(tokio::io::stdout()))),
service_name: Self::get_service_name(),
version_identifier: Self::get_version_identifier(),
}
}
pub fn new_with_writer<W>(writer: W) -> Self
where
W: AsyncWrite + Send + Sync + Unpin + 'static,
{
StdoutClient {
content_encoding_gzip: Self::parse_compression(),
writer: Arc::new(Mutex::new(Box::new(writer))),
service_name: Self::get_service_name(),
version_identifier: Self::get_version_identifier(),
}
}
fn parse_compression() -> Option<String> {
env::var("OTEL_EXPORTER_OTLP_COMPRESSION")
.ok()
.filter(|v| v.eq_ignore_ascii_case(ENCODING_GZIP))
.map(|_| ENCODING_GZIP.to_string())
}
fn get_service_name() -> String {
env::var("OTEL_SERVICE_NAME")
.or_else(|_| env::var("AWS_LAMBDA_FUNCTION_NAME"))
.unwrap_or_else(|_| "unknown-service".to_string())
}
async fn process_payload(
&self,
request: &Request<bytes::Bytes>,
content_type: &str,
content_encoding: Option<&str>,
) -> Result<(), Box<dyn StdError + Send + Sync>> {
let is_input_gzipped = content_encoding == Some(ENCODING_GZIP);
let is_json = content_type == CONTENT_TYPE_JSON;
let mut should_encode_base64 = false;
let mut payload = request.body().clone();
if is_json {
let decompressed = if is_input_gzipped {
Self::decompress_payload(request.body())?.into()
} else {
request.body().clone()
};
payload = Self::optimize_json(&decompressed)?.into();
} else {
should_encode_base64 = true;
}
if self.content_encoding_gzip.is_some() {
payload = Self::compress_payload(&payload)?.into();
should_encode_base64 = true;
}
let final_payload = if should_encode_base64 {
Value::String(Self::encode_base64(&payload))
} else {
serde_json::from_slice(&payload)?
};
let log_record = LogRecord {
_otel: self.version_identifier.clone(),
source: self.service_name.clone(),
endpoint: request.uri().to_string(),
method: request.method().to_string(),
payload: final_payload,
headers: Some(Self::headers_to_hashmap(request.headers())),
content_type: content_type.to_string(),
content_encoding: self.content_encoding_gzip.clone(),
base64: Some(should_encode_base64),
};
let mut writer = self.writer.lock().await;
let json = format!("{}\n", serde_json::to_string(&log_record)?);
writer.write_all(json.as_bytes()).await?;
writer.flush().await?;
Ok(())
}
fn decompress_payload(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
let mut decoder = GzDecoder::new(payload);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
}
fn optimize_json(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
let json_value: Value = serde_json::from_slice(payload)?;
let optimized = serde_json::to_vec(&json_value)?;
Ok(optimized)
}
fn compress_payload(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
std::io::Write::write_all(&mut encoder, payload)?;
Ok(encoder.finish()?)
}
fn encode_base64(payload: &[u8]) -> String {
general_purpose::STANDARD.encode(payload)
}
fn headers_to_hashmap(headers: &http::HeaderMap) -> HashMap<String, String> {
headers
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|v| (name.as_str().to_lowercase(), v.to_string()))
})
.collect()
}
fn get_version_identifier() -> String {
format!("{}@{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
}
}
impl Default for StdoutClient {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl HttpClient for StdoutClient {
async fn send_bytes(
&self,
request: Request<bytes::Bytes>,
) -> Result<Response<Bytes>, Box<dyn StdError + Send + Sync>> {
let headers = request.headers();
let content_type = headers
.get(CONTENT_TYPE_HEADER)
.and_then(|ct| ct.to_str().ok());
let content_encoding = headers
.get(CONTENT_ENCODING_HEADER)
.and_then(|ct| ct.to_str().ok());
match content_type {
Some(content_type) => {
self.process_payload(&request, content_type, content_encoding)
.await?;
}
_ => {
let message = match content_type {
Some(ct) => format!("Content type '{}' is not supported", ct),
None => "Content type not specified".to_string(),
};
tracing::warn!("{message}. Skipping processing.");
return Ok(Response::builder().status(200).body(Bytes::new()).unwrap());
}
}
Ok(Response::builder().status(200).body(Bytes::new()).unwrap())
}
}
#[cfg(test)]
mod tests;
#[cfg(doctest)]
extern crate doc_comment;
#[cfg(doctest)]
use doc_comment::doctest;
#[cfg(doctest)]
doctest!("../README.md", readme);