otlp_stdout_client/
lib.rs

1//! # otlp-stdout-client
2//!
3//! `otlp-stdout-client` is a Rust library that provides an OpenTelemetry exporter
4//! designed for serverless environments, particularly AWS Lambda functions. This crate
5//! is part of the [serverless-otlp-forwarder](https://github.com/dev7a/serverless-otlp-forwarder/)
6//! project, which provides a comprehensive solution for OpenTelemetry telemetry collection
7//! in AWS Lambda environments.
8//!
9//! This exporter implements the `opentelemetry_http::HttpClient` interface and can be used
10//! in an OpenTelemetry OTLP pipeline to send OTLP data (both JSON and Protobuf formats)
11//! to stdout. This allows the data to be easily ingested and forwarded to an OTLP collector.
12//!
13//! ## Key Features
14//!
15//! - Implements `opentelemetry_http::HttpClient` for use in OTLP pipelines
16//! - Exports OpenTelemetry data to stdout in a structured format
17//! - Designed for serverless environments, especially AWS Lambda
18//! - Configurable through environment variables
19//! - Optional GZIP compression of payloads
20//! - Supports both JSON and Protobuf payloads
21//! ## Usage
22//!
23//! This library can be integrated into OpenTelemetry OTLP pipelines to redirect
24//! telemetry data to stdout. It's particularly useful in serverless environments
25//! where direct network access to a collector might not be available or desired.
26//!
27//! ```rust
28//! use otlp_stdout_client::StdoutClient;
29//! use opentelemetry_sdk::trace::SdkTracerProvider;
30//! use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
31//! use opentelemetry::trace::Tracer;
32//! use opentelemetry::global;
33//!
34//! fn init_tracer_provider() -> Result<SdkTracerProvider, Box<dyn std::error::Error>> {
35//!     let exporter = opentelemetry_otlp::SpanExporter::builder()
36//!         .with_http()
37//!         .with_http_client(StdoutClient::default())
38//!         .build()?;
39//!     
40//!     let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
41//!         .with_simple_exporter(exporter)
42//!         .build();
43//!
44//!     Ok(tracer_provider)
45//! }
46//!
47//! #[tokio::main]
48//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
49//!     let tracer_provider = init_tracer_provider()?;
50//!     global::set_tracer_provider(tracer_provider.clone());
51//!     
52//!     let tracer = global::tracer("my_tracer");
53//!     
54//!     // Use the tracer for instrumenting your code
55//!     // For example:
56//!     tracer.in_span("example_span", |_cx| {
57//!         // Your code here
58//!     });
59//!
60//!     Ok(())
61//! }
62//! ```
63//!
64//! ## Configuration
65//!
66//! The exporter can be configured using the following standard OTEL environment variables:
67//!
68//! - `OTEL_EXPORTER_OTLP_PROTOCOL`: Specifies the protocol to use for the OTLP exporter.
69//!   Valid values are:
70//!   - "http/json" (default): Uses HTTP with JSON payload
71//!   - "http/protobuf": Uses HTTP with Protobuf payload
72//!
73//! - `OTEL_EXPORTER_OTLP_ENDPOINT`: Sets the endpoint for the OTLP exporter.
74//!   Specify the endpoint of your OTLP collector.
75//!
76//! - `OTEL_EXPORTER_OTLP_HEADERS`: Sets additional headers for the OTLP exporter.
77//!   Format: "key1=value1,key2=value2"
78//!
79//! - `OTEL_EXPORTER_OTLP_COMPRESSION`: Specifies the compression algorithm to use.
80//!   Valid values are:
81//!   - "gzip": Compresses the payload using GZIP
82//!   - If not set or any other value, no compression is applied
83//!   
84//!
85//! For more detailed information on usage and configuration, please refer to the README.md file.
86
87use async_trait::async_trait;
88use base64::{engine::general_purpose, Engine as Base64Engine};
89use bytes::Bytes;
90use flate2::{read::GzDecoder, write::GzEncoder, Compression};
91use http::{Request, Response};
92
93use opentelemetry_http::HttpClient;
94use serde::{Deserialize, Serialize};
95use serde_json::Value;
96use std::collections::HashMap;
97use std::sync::Arc;
98use std::{env, error::Error as StdError, fmt::Debug, io::Read};
99use tokio::io::AsyncWrite;
100use tokio::io::AsyncWriteExt;
101use tokio::sync::Mutex;
102
103// Constants for content types
104pub const CONTENT_TYPE_JSON: &str = "application/json";
105pub const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf";
106
107// Constants for headers
108pub const CONTENT_TYPE_HEADER: &str = "content-type";
109pub const CONTENT_ENCODING_HEADER: &str = "content-encoding";
110
111// Constants for JSON keys
112pub const KEY_SOURCE: &str = "source";
113pub const KEY_ENDPOINT: &str = "endpoint";
114pub const KEY_METHOD: &str = "method";
115pub const KEY_PAYLOAD: &str = "payload";
116pub const KEY_BASE64: &str = "base64";
117pub const KEY_HEADERS: &str = "headers";
118
119// Constant for OTEL version prefix
120pub const OTEL_VERSION_PREFIX: &str = "otlp-stdout-";
121
122// Constant for GZIP encoding
123pub const ENCODING_GZIP: &str = "gzip";
124
125#[derive(Debug, Serialize, Deserialize)]
126pub struct LogRecord {
127    #[serde(rename = "__otel_otlp_stdout")]
128    pub _otel: String,
129    pub source: String,
130    pub endpoint: String,
131    pub method: String,
132    pub payload: Value,
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub headers: Option<HashMap<String, String>>,
135    #[serde(rename = "content-type")]
136    pub content_type: String,
137    #[serde(rename = "content-encoding", skip_serializing_if = "Option::is_none")]
138    pub content_encoding: Option<String>,
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub base64: Option<bool>,
141}
142pub struct StdoutClient {
143    service_name: String,
144    content_encoding_gzip: Option<String>,
145    writer: Arc<Mutex<Box<dyn AsyncWrite + Send + Sync + Unpin>>>,
146    version_identifier: String,
147}
148
149impl Debug for StdoutClient {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        f.debug_struct("StdoutClient")
152            .field("content_encoding_gzip", &self.content_encoding_gzip)
153            .field("writer", &"Box<dyn Write + Send + Sync>")
154            .field("service_name", &self.service_name)
155            .finish()
156    }
157}
158
159impl StdoutClient {
160    /// Creates a new `StdoutClient` with the default writer (`stdout`).
161    ///
162    /// # Example
163    ///
164    /// ```rust
165    /// use otlp_stdout_client::StdoutClient;
166    ///
167    /// let client = StdoutClient::new();
168    /// ```
169    pub fn new() -> Self {
170        StdoutClient {
171            content_encoding_gzip: Self::parse_compression(),
172            writer: Arc::new(Mutex::new(Box::new(tokio::io::stdout()))),
173            service_name: Self::get_service_name(),
174            version_identifier: Self::get_version_identifier(),
175        }
176    }
177
178    /// Creates a new `StdoutClient` with a custom writer.
179    ///
180    /// This method allows you to specify an alternative writer, such as a file or an in-memory buffer.
181    ///
182    /// # Arguments
183    ///
184    /// * `writer` - Any writer implementing `AsyncWrite + Send + Sync + Unpin + 'static`.
185    ///
186    /// # Example
187    ///
188    /// ```rust,no_run
189    /// use otlp_stdout_client::StdoutClient;
190    /// use tokio::fs::File;
191    ///
192    /// # async fn example() -> std::io::Result<()> {
193    /// // Using a file as the writer
194    /// let file = File::create("output.log").await?;
195    /// let client = StdoutClient::new_with_writer(file);
196    /// # Ok(())
197    /// # }
198    /// ```
199    pub fn new_with_writer<W>(writer: W) -> Self
200    where
201        W: AsyncWrite + Send + Sync + Unpin + 'static,
202    {
203        StdoutClient {
204            content_encoding_gzip: Self::parse_compression(),
205            writer: Arc::new(Mutex::new(Box::new(writer))),
206            service_name: Self::get_service_name(),
207            version_identifier: Self::get_version_identifier(),
208        }
209    }
210
211    /// Parses the compression setting from the environment variable.
212    ///
213    /// This function reads the `OTEL_EXPORTER_OTLP_COMPRESSION` environment variable
214    /// and returns `Some("gzip")` if the value is "gzip", otherwise it returns `None`.
215    ///
216    /// # Returns
217    ///
218    /// An `Option<String>` containing "gzip" if compression is enabled, or `None` otherwise.
219    fn parse_compression() -> Option<String> {
220        env::var("OTEL_EXPORTER_OTLP_COMPRESSION")
221            .ok()
222            .filter(|v| v.eq_ignore_ascii_case(ENCODING_GZIP))
223            .map(|_| ENCODING_GZIP.to_string())
224    }
225
226    /// Gets the service name from environment variables.
227    ///
228    /// This function reads the `OTEL_SERVICE_NAME` or `AWS_LAMBDA_FUNCTION_NAME` environment variable
229    /// and returns its value. If neither is set, it returns "unknown-service".
230    ///
231    /// # Returns
232    ///
233    /// A String containing the service name.
234    fn get_service_name() -> String {
235        env::var("OTEL_SERVICE_NAME")
236            .or_else(|_| env::var("AWS_LAMBDA_FUNCTION_NAME"))
237            .unwrap_or_else(|_| "unknown-service".to_string())
238    }
239
240    /// Processes the HTTP request payload, handling compression, decompression,
241    /// and JSON optimization based on content type and encoding.
242    ///
243    /// Logic flow:
244    /// 1. For JSON payloads:
245    ///    - If input is gzipped, decompress it
246    ///    - Always optimize the JSON
247    ///    - Only base64 encode if we're going to compress it for output
248    ///
249    /// 2. For non-JSON payloads (protobuf):
250    ///    - Always base64 encode (since it's binary)
251    ///    - Keep the original payload as-is
252    ///    - If output compression is enabled, compress it
253    ///
254    /// 3. For all payloads:
255    ///    - If output compression is enabled, compress and mark for base64 encoding
256    ///    - Base64 encode if either:
257    ///      - It's a binary payload (protobuf)
258    ///      - We compressed it for output
259    ///
260    /// # Arguments
261    ///
262    /// * `request` - The incoming HTTP request containing the payload
263    /// * `content_type` - The MIME content type of the payload
264    /// * `content_encoding` - The content encoding of the payload, if any
265    ///
266    /// # Returns
267    ///
268    /// A `Result` indicating success or failure
269    async fn process_payload(
270        &self,
271        request: &Request<bytes::Bytes>,
272        content_type: &str,
273        content_encoding: Option<&str>,
274    ) -> Result<(), Box<dyn StdError + Send + Sync>> {
275        let is_input_gzipped = content_encoding == Some(ENCODING_GZIP);
276        let is_json = content_type == CONTENT_TYPE_JSON;
277        let mut should_encode_base64 = false;
278
279        let mut payload = request.body().clone();
280        // if input is json, optionally decompress it and optimize it
281        if is_json {
282            let decompressed = if is_input_gzipped {
283                Self::decompress_payload(request.body())?.into()
284            } else {
285                request.body().clone()
286            };
287            payload = Self::optimize_json(&decompressed)?.into();
288        } else {
289            // if input is not json, we need to encode it as base64 in any case
290            should_encode_base64 = true;
291        }
292
293        // Compress payload if output compression is enabled
294        if self.content_encoding_gzip.is_some() {
295            payload = Self::compress_payload(&payload)?.into();
296            // if we compressed, we need to encode as base64
297            should_encode_base64 = true;
298        }
299
300        // Prepare final payload so it can be serialized to json
301        let final_payload = if should_encode_base64 {
302            Value::String(Self::encode_base64(&payload))
303        } else {
304            serde_json::from_slice(&payload)?
305        };
306
307        // Create the log record
308        let log_record = LogRecord {
309            _otel: self.version_identifier.clone(),
310            source: self.service_name.clone(),
311            endpoint: request.uri().to_string(),
312            method: request.method().to_string(),
313            payload: final_payload,
314            headers: Some(Self::headers_to_hashmap(request.headers())),
315            content_type: content_type.to_string(),
316            content_encoding: self.content_encoding_gzip.clone(),
317            base64: Some(should_encode_base64),
318        };
319
320        // Write the log record
321        let mut writer = self.writer.lock().await;
322        let json = format!("{}\n", serde_json::to_string(&log_record)?);
323        writer.write_all(json.as_bytes()).await?;
324        writer.flush().await?;
325
326        Ok(())
327    }
328
329    /// Decompresses a GZIP-compressed payload.
330    ///
331    /// # Arguments
332    ///
333    /// * `payload` - The compressed payload as a byte slice.
334    ///
335    /// # Returns
336    ///
337    /// A `Result` containing the decompressed payload as a `Vec<u8>`.
338    fn decompress_payload(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
339        let mut decoder = GzDecoder::new(payload);
340        let mut decompressed = Vec::new();
341        decoder.read_to_end(&mut decompressed)?;
342        Ok(decompressed)
343    }
344
345    /// Optimizes a JSON payload by parsing and re-serializing it.
346    ///
347    /// This removes unnecessary whitespace and ensures a consistent JSON format.
348    ///
349    /// # Arguments
350    ///
351    /// * `payload` - The JSON payload as a byte slice.
352    ///
353    /// # Returns
354    ///
355    /// A `Result` containing the optimized JSON payload as a `Vec<u8>`.
356    fn optimize_json(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
357        let json_value: Value = serde_json::from_slice(payload)?;
358        let optimized = serde_json::to_vec(&json_value)?;
359        Ok(optimized)
360    }
361
362    /// Compresses the payload using GZIP compression.
363    ///
364    /// # Arguments
365    ///
366    /// * `payload` - The payload as a byte slice.
367    ///
368    /// # Returns
369    ///
370    /// A `Result` containing the compressed payload as a `Vec<u8>`.
371    fn compress_payload(payload: &[u8]) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
372        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
373        std::io::Write::write_all(&mut encoder, payload)?;
374        Ok(encoder.finish()?)
375    }
376
377    /// Encodes a byte slice to a base64 string.
378    ///
379    /// This function takes a byte slice and encodes it to a base64 string
380    /// using the standard base64 alphabet.
381    ///
382    /// # Arguments
383    ///
384    /// * `payload` - A byte slice containing the data to be encoded.
385    ///
386    /// # Returns
387    ///
388    /// A `String` containing the base64 encoded representation of the input payload.
389    fn encode_base64(payload: &[u8]) -> String {
390        general_purpose::STANDARD.encode(payload)
391    }
392
393    /// Converts HTTP headers to a `HashMap` with lowercase header names.
394    ///
395    /// # Arguments
396    ///
397    /// * `headers` - The HTTP headers from the request.
398    ///
399    /// # Returns
400    ///
401    /// A `HashMap` mapping header names to their values.
402    fn headers_to_hashmap(headers: &http::HeaderMap) -> HashMap<String, String> {
403        headers
404            .iter()
405            .filter_map(|(name, value)| {
406                value
407                    .to_str()
408                    .ok()
409                    .map(|v| (name.as_str().to_lowercase(), v.to_string()))
410            })
411            .collect()
412    }
413
414    /// Gets the version identifier string for the client.
415    ///
416    /// This function returns a string containing the package name and version,
417    /// formatted as "{package_name}@{version}". The values are obtained from
418    /// cargo environment variables at compile time.
419    ///
420    /// # Returns
421    ///
422    /// A String containing the version identifier in the format "package@version"
423    fn get_version_identifier() -> String {
424        format!("{}@{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
425    }
426}
427
428impl Default for StdoutClient {
429    fn default() -> Self {
430        Self::new()
431    }
432}
433
434/// Implements the `HttpClient` trait for `StdoutClient`.
435///
436/// This implementation allows the `StdoutClient` to be used as an HTTP client
437/// for sending OTLP (OpenTelemetry Protocol) data. It processes the request body
438/// and writes it to stdout in a JSON format suitable for log parsing.
439///
440/// The `send` method handles both JSON and non-JSON payloads, formatting them
441/// appropriately for stdout output.
442#[async_trait]
443impl HttpClient for StdoutClient {
444    async fn send_bytes(
445        &self,
446        request: Request<bytes::Bytes>,
447    ) -> Result<Response<Bytes>, Box<dyn StdError + Send + Sync>> {
448        let headers = request.headers();
449        let content_type = headers
450            .get(CONTENT_TYPE_HEADER)
451            .and_then(|ct| ct.to_str().ok());
452        let content_encoding = headers
453            .get(CONTENT_ENCODING_HEADER)
454            .and_then(|ct| ct.to_str().ok());
455
456        match content_type {
457            Some(content_type) => {
458                self.process_payload(&request, content_type, content_encoding)
459                    .await?;
460            }
461            _ => {
462                let message = match content_type {
463                    Some(ct) => format!("Content type '{}' is not supported", ct),
464                    None => "Content type not specified".to_string(),
465                };
466                tracing::warn!("{message}. Skipping processing.");
467                return Ok(Response::builder().status(200).body(Bytes::new()).unwrap());
468            }
469        }
470
471        Ok(Response::builder().status(200).body(Bytes::new()).unwrap())
472    }
473}
474
475#[cfg(test)]
476mod tests;
477
478#[cfg(doctest)]
479extern crate doc_comment;
480
481#[cfg(doctest)]
482use doc_comment::doctest;
483
484#[cfg(doctest)]
485doctest!("../README.md", readme);