otlp_stdout_client/lib.rs
1//! # otlp-stdout-client
2//!
3//! `otlp-stdout-client` is a Rust library that provides an OpenTelemetry exporter
4//! designed for serverless environments, particularly AWS Lambda functions. This crate
5//! is part of the [serverless-otlp-forwarder](https://github.com/dev7a/serverless-otlp-forwarder/)
6//! project, which provides a comprehensive solution for OpenTelemetry telemetry collection
7//! in AWS Lambda environments.
8//!
9//! This exporter implements the `opentelemetry_http::HttpClient` interface and can be used
10//! in an OpenTelemetry OTLP pipeline to send OTLP data (both JSON and Protobuf formats)
11//! to stdout. This allows the data to be easily ingested and forwarded to an OTLP collector.
12//!
13//! ## Key Features
14//!
15//! - Implements `opentelemetry_http::HttpClient` for use in OTLP pipelines
16//! - Exports OpenTelemetry data to stdout in a structured format
17//! - Designed for serverless environments, especially AWS Lambda
18//! - Configurable through environment variables
19//! - Optional GZIP compression of payloads
20//! - Supports both JSON and Protobuf payloads
21//! ## Usage
22//!
23//! This library can be integrated into OpenTelemetry OTLP pipelines to redirect
24//! telemetry data to stdout. It's particularly useful in serverless environments
25//! where direct network access to a collector might not be available or desired.
26//!
27//! ```rust
28//! use otlp_stdout_client::StdoutClient;
29//! use opentelemetry_sdk::trace::SdkTracerProvider;
30//! use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
31//! use opentelemetry::trace::Tracer;
32//! use opentelemetry::global;
33//!
34//! fn init_tracer_provider() -> Result<SdkTracerProvider, Box<dyn std::error::Error>> {
35//! let exporter = opentelemetry_otlp::SpanExporter::builder()
36//! .with_http()
37//! .with_http_client(StdoutClient::default())
38//! .build()?;
39//!
40//! let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
41//! .with_simple_exporter(exporter)
42//! .build();
43//!
44//! Ok(tracer_provider)
45//! }
46//!
47//! #[tokio::main]
48//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
49//! let tracer_provider = init_tracer_provider()?;
50//! global::set_tracer_provider(tracer_provider.clone());
51//!
52//! let tracer = global::tracer("my_tracer");
53//!
54//! // Use the tracer for instrumenting your code
55//! // For example:
56//! tracer.in_span("example_span", |_cx| {
57//! // Your code here
58//! });
59//!
60//! Ok(())
61//! }
62//! ```
63//!
64//! ## Configuration
65//!
66//! The exporter can be configured using the following standard OTEL environment variables:
67//!
68//! - `OTEL_EXPORTER_OTLP_PROTOCOL`: Specifies the protocol to use for the OTLP exporter.
69//! Valid values are:
70//! - "http/json" (default): Uses HTTP with JSON payload
71//! - "http/protobuf": Uses HTTP with Protobuf payload
72//!
73//! - `OTEL_EXPORTER_OTLP_ENDPOINT`: Sets the endpoint for the OTLP exporter.
74//! Specify the endpoint of your OTLP collector.
75//!
76//! - `OTEL_EXPORTER_OTLP_HEADERS`: Sets additional headers for the OTLP exporter.
77//! Format: "key1=value1,key2=value2"
78//!
79//! - `OTEL_EXPORTER_OTLP_COMPRESSION`: Specifies the compression algorithm to use.
80//! Valid values are:
81//! - "gzip": Compresses the payload using GZIP
82//! - If not set or any other value, no compression is applied
83//!
84//!
85//! For more detailed information on usage and configuration, please refer to the README.md file.
86
87use async_trait::async_trait;
88use base64::{engine::general_purpose, Engine as Base64Engine};
89use bytes::Bytes;
90use flate2::{read::GzDecoder, write::GzEncoder, Compression};
91use http::{Request, Response};
92
93use opentelemetry_http::HttpClient;
94use serde::{Deserialize, Serialize};
95use serde_json::Value;
96use std::collections::HashMap;
97use std::sync::Arc;
98use std::{env, error::Error as StdError, fmt::Debug, io::Read};
99use tokio::io::AsyncWrite;
100use tokio::io::AsyncWriteExt;
101use tokio::sync::Mutex;
102
103// Constants for content types
104pub const CONTENT_TYPE_JSON: &str = "application/json";
105pub const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf";
106
107// Constants for headers
108pub const CONTENT_TYPE_HEADER: &str = "content-type";
109pub const CONTENT_ENCODING_HEADER: &str = "content-encoding";
110
111// Constants for JSON keys
112pub const KEY_SOURCE: &str = "source";
113pub const KEY_ENDPOINT: &str = "endpoint";
114pub const KEY_METHOD: &str = "method";
115pub const KEY_PAYLOAD: &str = "payload";
116pub const KEY_BASE64: &str = "base64";
117pub const KEY_HEADERS: &str = "headers";
118
119// Constant for OTEL version prefix
120pub const OTEL_VERSION_PREFIX: &str = "otlp-stdout-";
121
122// Constant for GZIP encoding
123pub const ENCODING_GZIP: &str = "gzip";
124
125#[derive(Debug, Serialize, Deserialize)]
126pub struct LogRecord {
127 #[serde(rename = "__otel_otlp_stdout")]
128 pub _otel: String,
129 pub source: String,
130 pub endpoint: String,
131 pub method: String,
132 pub payload: Value,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub headers: Option<HashMap<String, String>>,
135 #[serde(rename = "content-type")]
136 pub content_type: String,
137 #[serde(rename = "content-encoding", skip_serializing_if = "Option::is_none")]
138 pub content_encoding: Option<String>,
139 #[serde(skip_serializing_if = "Option::is_none")]
140 pub base64: Option<bool>,
141}
142pub struct StdoutClient {
143 service_name: String,
144 content_encoding_gzip: Option<String>,
145 writer: Arc<Mutex<Box<dyn AsyncWrite + Send + Sync + Unpin>>>,
146 version_identifier: String,
147}
148
149impl Debug for StdoutClient {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("StdoutClient")
152 .field("content_encoding_gzip", &self.content_encoding_gzip)
153 .field("writer", &"Box<dyn Write + Send + Sync>")
154 .field("service_name", &self.service_name)
155 .finish()
156 }
157}
158
159impl StdoutClient {
160 /// Creates a new `StdoutClient` with the default writer (`stdout`).
161 ///
162 /// # Example
163 ///
164 /// ```rust
165 /// use otlp_stdout_client::StdoutClient;
166 ///
167 /// let client = StdoutClient::new();
168 /// ```
169 pub fn new() -> Self {
170 StdoutClient {
171 content_encoding_gzip: Self::parse_compression(),
172 writer: Arc::new(Mutex::new(Box::new(tokio::io::stdout()))),
173 service_name: Self::get_service_name(),
174 version_identifier: Self::get_version_identifier(),
175 }
176 }
177
178 /// Creates a new `StdoutClient` with a custom writer.
179 ///
180 /// This method allows you to specify an alternative writer, such as a file or an in-memory buffer.
181 ///
182 /// # Arguments
183 ///
184 /// * `writer` - Any writer implementing `AsyncWrite + Send + Sync + Unpin + 'static`.
185 ///
186 /// # Example
187 ///
188 /// ```rust,no_run
189 /// use otlp_stdout_client::StdoutClient;
190 /// use tokio::fs::File;
191 ///
192 /// # async fn example() -> std::io::Result<()> {
193 /// // Using a file as the writer
194 /// let file = File::create("output.log").await?;
195 /// let client = StdoutClient::new_with_writer(file);
196 /// # Ok(())
197 /// # }
198 /// ```
199 pub fn new_with_writer<W>(writer: W) -> Self
200 where
201 W: AsyncWrite + Send + Sync + Unpin + 'static,
202 {
203 StdoutClient {
204 content_encoding_gzip: Self::parse_compression(),
205 writer: Arc::new(Mutex::new(Box::new(writer))),
206 service_name: Self::get_service_name(),
207 version_identifier: Self::get_version_identifier(),
208 }
209 }
210
211 /// Parses the compression setting from the environment variable.
212 ///
213 /// This function reads the `OTEL_EXPORTER_OTLP_COMPRESSION` environment variable
214 /// and returns `Some("gzip")` if the value is "gzip", otherwise it returns `None`.
215 ///
216 /// # Returns
217 ///
218 /// An `Option<String>` containing "gzip" if compression is enabled, or `None` otherwise.
219 fn parse_compression() -> Option<String> {
220 env::var("OTEL_EXPORTER_OTLP_COMPRESSION")
221 .ok()
222 .filter(|v| v.eq_ignore_ascii_case(ENCODING_GZIP))
223 .map(|_| ENCODING_GZIP.to_string())
224 }
225
226 /// Gets the service name from environment variables.
227 ///
228 /// This function reads the `OTEL_SERVICE_NAME` or `AWS_LAMBDA_FUNCTION_NAME` environment variable
229 /// and returns its value. If neither is set, it returns "unknown-service".
230 ///
231 /// # Returns
232 ///
233 /// A String containing the service name.
234 fn get_service_name() -> String {
235 env::var("OTEL_SERVICE_NAME")
236 .or_else(|_| env::var("AWS_LAMBDA_FUNCTION_NAME"))
237 .unwrap_or_else(|_| "unknown-service".to_string())
238 }
239
240 /// Processes the HTTP request payload, handling compression, decompression,
241 /// and JSON optimization based on content type and encoding.
242 ///
243 /// Logic flow:
244 /// 1. For JSON payloads:
245 /// - If input is gzipped, decompress it
246 /// - Always optimize the JSON
247 /// - Only base64 encode if we're going to compress it for output
248 ///
249 /// 2. For non-JSON payloads (protobuf):
250 /// - Always base64 encode (since it's binary)
251 /// - Keep the original payload as-is
252 /// - If output compression is enabled, compress it
253 ///
254 /// 3. For all payloads:
255 /// - If output compression is enabled, compress and mark for base64 encoding
256 /// - Base64 encode if either:
257 /// - It's a binary payload (protobuf)
258 /// - We compressed it for output
259 ///
260 /// # Arguments
261 ///
262 /// * `request` - The incoming HTTP request containing the payload
263 /// * `content_type` - The MIME content type of the payload
264 /// * `content_encoding` - The content encoding of the payload, if any
265 ///
266 /// # Returns
267 ///
268 /// A `Result` indicating success or failure
269 async fn process_payload(
270 &self,
271 request: &Request<bytes::Bytes>,
272 content_type: &str,
273 content_encoding: Option<&str>,
274 ) -> Result<(), Box<dyn StdError + Send + Sync>> {
275 let is_input_gzipped = content_encoding == Some(ENCODING_GZIP);
276 let is_json = content_type == CONTENT_TYPE_JSON;
277 let mut should_encode_base64 = false;
278
279 let mut payload = request.body().clone();
280 // if input is json, optionally decompress it and optimize it
281 if is_json {
282 let decompressed = if is_input_gzipped {
283 Self::decompress_payload(request.body())?.into()
284 } else {
285 request.body().clone()
286 };
287 payload = Self::optimize_json(&decompressed)?.into();
288 } else {
289 // if input is not json, we need to encode it as base64 in any case
290 should_encode_base64 = true;
291 }
292
293 // Compress payload if output compression is enabled
294 if self.content_encoding_gzip.is_some() {
295 payload = Self::compress_payload(&payload)?.into();
296 // if we compressed, we need to encode as base64
297 should_encode_base64 = true;
298 }
299
300 // Prepare final payload so it can be serialized to json
301 let final_payload = if should_encode_base64 {
302 Value::String(Self::encode_base64(&payload))
303 } else {
304 serde_json::from_slice(&payload)?
305 };
306
307 // Create the log record
308 let log_record = LogRecord {
309 _otel: self.version_identifier.clone(),
310 source: self.service_name.clone(),
311 endpoint: request.uri().to_string(),
312 method: request.method().to_string(),
313 payload: final_payload,
314 headers: Some(Self::headers_to_hashmap(request.headers())),
315 content_type: content_type.to_string(),
316 content_encoding: self.content_encoding_gzip.clone(),
317 base64: Some(should_encode_base64),
318 };
319
320 // Write the log record
321 let mut writer = self.writer.lock().await;
322 let json = format!("{}\n", serde_json::to_string(&log_record)?);
323 writer.write_all(json.as_bytes()).await?;
324 writer.flush().await?;
325
326 Ok(())
327 }
328
329 /// Decompresses a GZIP-compressed payload.
330 ///
331 /// # Arguments
332 ///
333 /// * `payload` - The compressed payload as a byte slice.
334 ///
335 /// # Returns
336 ///
337 /// A `Result` containing the decompressed payload as a `Vec<u8>`.
338 fn decompress_payload(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
339 let mut decoder = GzDecoder::new(payload);
340 let mut decompressed = Vec::new();
341 decoder.read_to_end(&mut decompressed)?;
342 Ok(decompressed)
343 }
344
345 /// Optimizes a JSON payload by parsing and re-serializing it.
346 ///
347 /// This removes unnecessary whitespace and ensures a consistent JSON format.
348 ///
349 /// # Arguments
350 ///
351 /// * `payload` - The JSON payload as a byte slice.
352 ///
353 /// # Returns
354 ///
355 /// A `Result` containing the optimized JSON payload as a `Vec<u8>`.
356 fn optimize_json(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
357 let json_value: Value = serde_json::from_slice(payload)?;
358 let optimized = serde_json::to_vec(&json_value)?;
359 Ok(optimized)
360 }
361
362 /// Compresses the payload using GZIP compression.
363 ///
364 /// # Arguments
365 ///
366 /// * `payload` - The payload as a byte slice.
367 ///
368 /// # Returns
369 ///
370 /// A `Result` containing the compressed payload as a `Vec<u8>`.
371 fn compress_payload(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
372 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
373 std::io::Write::write_all(&mut encoder, payload)?;
374 Ok(encoder.finish()?)
375 }
376
377 /// Encodes a byte slice to a base64 string.
378 ///
379 /// This function takes a byte slice and encodes it to a base64 string
380 /// using the standard base64 alphabet.
381 ///
382 /// # Arguments
383 ///
384 /// * `payload` - A byte slice containing the data to be encoded.
385 ///
386 /// # Returns
387 ///
388 /// A `String` containing the base64 encoded representation of the input payload.
389 fn encode_base64(payload: &[u8]) -> String {
390 general_purpose::STANDARD.encode(payload)
391 }
392
393 /// Converts HTTP headers to a `HashMap` with lowercase header names.
394 ///
395 /// # Arguments
396 ///
397 /// * `headers` - The HTTP headers from the request.
398 ///
399 /// # Returns
400 ///
401 /// A `HashMap` mapping header names to their values.
402 fn headers_to_hashmap(headers: &http::HeaderMap) -> HashMap<String, String> {
403 headers
404 .iter()
405 .filter_map(|(name, value)| {
406 value
407 .to_str()
408 .ok()
409 .map(|v| (name.as_str().to_lowercase(), v.to_string()))
410 })
411 .collect()
412 }
413
414 /// Gets the version identifier string for the client.
415 ///
416 /// This function returns a string containing the package name and version,
417 /// formatted as "{package_name}@{version}". The values are obtained from
418 /// cargo environment variables at compile time.
419 ///
420 /// # Returns
421 ///
422 /// A String containing the version identifier in the format "package@version"
423 fn get_version_identifier() -> String {
424 format!("{}@{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
425 }
426}
427
428impl Default for StdoutClient {
429 fn default() -> Self {
430 Self::new()
431 }
432}
433
434/// Implements the `HttpClient` trait for `StdoutClient`.
435///
436/// This implementation allows the `StdoutClient` to be used as an HTTP client
437/// for sending OTLP (OpenTelemetry Protocol) data. It processes the request body
438/// and writes it to stdout in a JSON format suitable for log parsing.
439///
440/// The `send` method handles both JSON and non-JSON payloads, formatting them
441/// appropriately for stdout output.
442#[async_trait]
443impl HttpClient for StdoutClient {
444 async fn send_bytes(
445 &self,
446 request: Request<bytes::Bytes>,
447 ) -> Result<Response<Bytes>, Box<dyn StdError + Send + Sync>> {
448 let headers = request.headers();
449 let content_type = headers
450 .get(CONTENT_TYPE_HEADER)
451 .and_then(|ct| ct.to_str().ok());
452 let content_encoding = headers
453 .get(CONTENT_ENCODING_HEADER)
454 .and_then(|ct| ct.to_str().ok());
455
456 match content_type {
457 Some(content_type) => {
458 self.process_payload(&request, content_type, content_encoding)
459 .await?;
460 }
461 _ => {
462 let message = match content_type {
463 Some(ct) => format!("Content type '{}' is not supported", ct),
464 None => "Content type not specified".to_string(),
465 };
466 tracing::warn!("{message}. Skipping processing.");
467 return Ok(Response::builder().status(200).body(Bytes::new()).unwrap());
468 }
469 }
470
471 Ok(Response::builder().status(200).body(Bytes::new()).unwrap())
472 }
473}
474
475#[cfg(test)]
476mod tests;
477
478#[cfg(doctest)]
479extern crate doc_comment;
480
481#[cfg(doctest)]
482use doc_comment::doctest;
483
484#[cfg(doctest)]
485doctest!("../README.md", readme);