Expand description
§Serverless OTLP Forwarder Core
The serverless-otlp-forwarder-core crate provides essential, shared components for building AWS Lambda functions that process and forward OpenTelemetry (OTLP) data. It is a core part of the Serverless OTLP Forwarder project.
This crate is designed to be used by specific Lambda processor implementations, offering a standardized way to parse, compact, and send OTLP telemetry batches.
§Table of Contents
§Features
- Standardized Telemetry Handling: Defines a common
TelemetryDatastruct for internal OTLP representation. - Pluggable Event Parsing: Uses an
EventParsertrait to allow different Lambda processors for different event sources to implement their specific event decoding logic. - Efficient Batching: Includes a
span_compactormodule to merge multiple OTLP messages into a single batch. - Configurable OTLP Export: Provides an HTTP sender that respects standard OpenTelemetry environment variables for endpoint and header configuration (e.g.,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,OTEL_EXPORTER_OTLP_TRACES_HEADERS). - Simplified Processor Logic: Offers a generic
process_event_batchfunction to orchestrate the parse-compact-send workflow. - Zero-Boilerplate HTTP Clients: Built-in HTTP client implementations eliminate the need for custom trait implementations in your Lambda functions.
- Optional Instrumentation: Feature-gated support for request tracing and middleware integration.
§Core Components
§TelemetryData
(Located in src/telemetry.rs)
The central struct representing a unit of telemetry data. It normalizes incoming data into an OTLP protobuf format (uncompressed initially) and includes methods for final compression (Gzip). Its fields include source, endpoint (primarily for context, as the actual target is resolved from env vars), payload, content_type, and content_encoding.
§EventParser Trait
(Located in src/core_parser.rs)
A trait that specific Lambda processors must implement to convert their incoming AWS event payloads into a Vec<TelemetryData>.
use anyhow;
use serverless_otlp_forwarder_core::TelemetryData;
pub trait EventParser {
type EventInput; // The specific AWS event type
fn parse(&self, event_payload: Self::EventInput, source_identifier: &str) -> anyhow::Result<Vec<TelemetryData>>;
}§Span Compaction
(Located in src/span_compactor.rs)
SpanCompactionConfig: Configuration for enabling/disabling compaction, setting max payload size, and Gzip compression level.compact_telemetry_payloads(): Takes aVec<TelemetryData>(expected to contain uncompressed OTLP protobuf payloads) and merges them into a singleTelemetryDataobject, then applies Gzip compression according to the config.
§HTTP Sender
(Located in src/http_sender.rs)
resolve_otlp_endpoint(): Determines the target OTLP HTTP endpoint by checkingOTEL_EXPORTER_OTLP_TRACES_ENDPOINT, thenOTEL_EXPORTER_OTLP_ENDPOINT, and finally defaulting tohttp://localhost:4318/v1/traces. It correctly appends/v1/tracesif a base URL is provided viaOTEL_EXPORTER_OTLP_ENDPOINT.resolve_otlp_headers(): Parses custom HTTP headers fromOTEL_EXPORTER_OTLP_TRACES_HEADERSorOTEL_EXPORTER_OTLP_HEADERS(comma-separatedkey=valueformat).send_telemetry_batch(): Asynchronously sends a (compacted and compressed)TelemetryDatapayload to the resolved endpoint using the resolved headers.
§HTTP Client Options
The crate provides multiple HTTP client options to minimize boilerplate in your Lambda implementations:
§Simple ReqwestClient (Recommended for most use cases)
use reqwest::Client as ReqwestClient;
use std::sync::Arc;
// ReqwestClient implements HttpOtlpForwarderClient out of the box
let http_client = Arc::new(ReqwestClient::new());§Builder Functions
use serverless_otlp_forwarder_core::client_builder;
use std::sync::Arc;
use std::time::Duration;
// Simple client
let http_client = Arc::new(client_builder::simple());
// With custom timeout
let http_client = Arc::new(client_builder::with_timeout(Duration::from_secs(30)));§Instrumented Client (Feature: instrumented-client)
For Lambda functions that need request tracing and middleware support:
use reqwest::Client as ReqwestClient;
use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware;
use serverless_otlp_forwarder_core::InstrumentedHttpClient;
use std::sync::Arc;
// Create with custom middleware
let base_client = ReqwestClient::new();
let middleware_client = ClientBuilder::new(base_client)
.with(TracingMiddleware::default())
.build();
let http_client = Arc::new(InstrumentedHttpClient::new(middleware_client));
// Or use the builder (when instrumented-client feature is enabled)
// let http_client = Arc::new(client_builder::instrumented());Use Case: The instrumented client is particularly useful when you want to instrument the forwarder’s own HTTP requests to OTLP collectors. This aligns with the OpenTelemetry Collector’s internal telemetry capabilities, allowing you to observe the forwarder’s performance, request patterns, and potential issues when sending data to collectors.
Note: OpenTelemetry’s tracing instrumentation for collectors is still under active development and considered experimental. The instrumented client provides HTTP request tracing that can complement the collector’s internal telemetry when debugging data flow issues or monitoring forwarder performance.
§process_event_batch Orchestrator
(Located in src/processor.rs)
The main generic function that orchestrates the telemetry processing pipeline:
- Calls the provided
EventParser’sparsemethod. - If telemetry items are produced, calls
compact_telemetry_payloads. - Sends the resulting batch using
send_telemetry_batch.
Handles errors at each step.
§Installation
This crate is intended to be used as a dependency by other Lambda functions implementing the Serverless OTLP Forwarder architecture. It can be added with the following command:
cargo add serverless-otlp-forwarder-core§Optional Features
instrumented-client: Enables theInstrumentedHttpClientfor advanced middleware support[dependencies] serverless-otlp-forwarder-core = { version = "0.2.1", features = ["instrumented-client"] }
§Usage Example
Implementing a forwarder for AWS CloudWatch Logs containing ExporterOutput JSON from the otlp-stdout-span-exporter crate:
1. Define your parser (src/parser.rs in your Lambda crate):
use aws_lambda_events::cloudwatch_logs::LogsEvent;
use anyhow::Result;
use serverless_otlp_forwarder_core::{EventParser, TelemetryData};
pub struct MyLogsEventParser;
impl EventParser for MyLogsEventParser {
type EventInput = LogsEvent;
fn parse(&self, _event_payload: Self::EventInput, _source_identifier: &str) -> Result<Vec<TelemetryData>> {
// Simplified example - in real usage you'd parse log events containing OTLP data
let items = Vec::new();
// Example parsing logic would go here:
// for log_event in event_payload.aws_logs.data.log_events {
// // Parse JSON containing OTLP data and convert to TelemetryData
// }
Ok(items)
}
}2. Use in your Lambda’s main.rs:
use anyhow::Result;
use lambda_runtime::{Error as LambdaError, LambdaEvent, Runtime, service_fn};
use reqwest::Client as ReqwestClient;
use serverless_otlp_forwarder_core::{
process_event_batch,
SpanCompactionConfig,
client_builder,
EventParser,
TelemetryData,
};
use std::sync::Arc;
use aws_lambda_events::cloudwatch_logs::LogsEvent;
// Define the parser inline instead of as a separate module
pub struct MyLogsEventParser;
impl EventParser for MyLogsEventParser {
type EventInput = LogsEvent;
fn parse(&self, _event_payload: Self::EventInput, _source_identifier: &str) -> Result<Vec<TelemetryData>> {
// Simplified example - in real usage you'd parse the actual event
Ok(vec![])
}
}
async fn function_handler(
event: LambdaEvent<LogsEvent>,
http_client: Arc<ReqwestClient>,
) -> Result<(), LambdaError> {
let log_group = event.payload.aws_logs.data.log_group.clone();
let parser = MyLogsEventParser;
let compaction_config = SpanCompactionConfig::default();
process_event_batch(
event.payload,
&parser,
&log_group,
http_client.as_ref(),
&compaction_config,
)
.await
.map_err(|e| LambdaError::from(e.to_string()))?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), LambdaError> {
// Initialize telemetry (optional)
// ... telemetry setup code ...
// Create HTTP client - Zero boilerplate required!
let http_client = Arc::new(client_builder::simple());
// Alternative options:
// let http_client = Arc::new(ReqwestClient::new());
// let http_client = Arc::new(client_builder::with_timeout(Duration::from_secs(30)));
// Initialize the Lambda runtime
Runtime::new(service_fn(move |event: LambdaEvent<LogsEvent>| {
let client = Arc::clone(&http_client);
async move { function_handler(event, client).await }
})).run().await
}§Advanced Example with Instrumentation
For more advanced use cases requiring request tracing and observability of the forwarder itself:
# Cargo.toml
[dependencies]
serverless-otlp-forwarder-core = { version = "0.2.1", features = ["instrumented-client"] }
reqwest-middleware = "0.3"
reqwest-tracing = "0.5"use reqwest::Client as ReqwestClient;
use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware;
use serverless_otlp_forwarder_core::InstrumentedHttpClient;
use lambda_runtime::Error as LambdaError;
use anyhow::Result;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), LambdaError> {
// Create instrumented HTTP client to trace forwarder's own requests
let base_client = ReqwestClient::new();
let middleware_client = ClientBuilder::new(base_client)
.with(TracingMiddleware::default())
.build();
let http_client = Arc::new(InstrumentedHttpClient::new(middleware_client));
// Or use the builder function:
// let http_client = Arc::new(client_builder::instrumented());
// Rest of setup is identical...
Ok(())
}This setup enables distributed tracing of the forwarder’s HTTP requests to OTLP collectors, which is valuable for:
- Debugging data flow issues between the forwarder and collectors
- Monitoring forwarder performance and request latency patterns
- Correlating forwarder behavior with the OpenTelemetry Collector’s internal telemetry
- Identifying bottlenecks in the telemetry pipeline
Note: Since collector tracing instrumentation is experimental, this instrumented client provides a stable way to observe the forwarder’s side of the telemetry pipeline.
§Manual Implementation vs. Built-in HTTP Clients
Manual/Custom Setup (Without this crate’s built-in clients):
use reqwest::{Client as ReqwestClient, Response};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware;
use async_trait::async_trait;
use anyhow::{Result, Context};
use url::Url;
use reqwest::header::HeaderMap;
use bytes::Bytes;
use std::time::Duration;
use std::sync::Arc;
// Custom wrapper type required
pub struct InstrumentedOtlpClient(ClientWithMiddleware);
// Manual trait implementation required (15+ lines per Lambda)
#[async_trait]
trait HttpOtlpForwarderClient {
async fn post_telemetry(
&self,
target_url: Url,
headers: HeaderMap,
payload: Bytes,
timeout: Duration,
) -> Result<Response>;
}
#[async_trait]
impl HttpOtlpForwarderClient for InstrumentedOtlpClient {
async fn post_telemetry(
&self,
target_url: Url,
headers: HeaderMap,
payload: Bytes,
timeout: Duration,
) -> Result<Response> {
self.0
.post(target_url)
.headers(headers)
.body(payload)
.timeout(timeout)
.send()
.await
.context("HTTP request failed via InstrumentedOtlpClient for OTLP export")
}
}
// Complex setup in main()
let base_client = ReqwestClient::new();
let middleware_client = ClientBuilder::new(base_client)
.with(TracingMiddleware::default())
.build();
let http_client = Arc::new(InstrumentedOtlpClient(middleware_client));Built-in Options (Using this crate’s provided clients):
use reqwest::Client as ReqwestClient;
use serverless_otlp_forwarder_core::client_builder;
use std::sync::Arc;
// Option 1: Simple
let http_client = Arc::new(ReqwestClient::new());
// Option 2: Builder functions
let http_client = Arc::new(client_builder::simple());
// Option 3: Instrumented (with feature flag)
// let http_client = Arc::new(client_builder::instrumented());§Environment Variables
The http_sender module within this crate respects the following standard OpenTelemetry environment variables for configuring the OTLP export endpoint and headers:
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: The target URL for traces. If not set,OTEL_EXPORTER_OTLP_ENDPOINTis used. Defaults tohttp://localhost:4318/v1/traces.OTEL_EXPORTER_OTLP_ENDPOINT: A base URL for OTLP exports./v1/traceswill be appended if not present in the path.OTEL_EXPORTER_OTLP_TRACES_HEADERS: Custom headers for trace exports (e.g.,key1=value1,key2=value2).OTEL_EXPORTER_OTLP_HEADERS: Custom general OTLP headers, used if trace-specific headers are not set.OTEL_EXPORTER_OTLP_COMPRESSION: Whether to compress the payload. Valid values aregzipornone. Defaults tonone.OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL: The compression level for Gzip compression. Valid values are0to9. Please note that this is not part of the OTLP specification and is not supported by all OTLP exporters. We are adding support for it because it’s useful in a be able to tune it in a constrained Lambda environment. Defaults to9.
§License
Licensed under the MIT License. See workspace root.
Re-exports§
pub use telemetry::TelemetryData;pub use span_compactor::compact_telemetry_payloads;pub use span_compactor::SpanCompactionConfig;pub use http_sender::client_builder;pub use http_sender::send_telemetry_batch;pub use http_sender::HttpClient;pub use core_parser::EventParser;pub use processor::process_event_batch;
Modules§
- core_
parser - http_
sender - processor
- span_
compactor - Module for compacting multiple OTLP span payloads into a single request
- telemetry