telemetry_rust/middleware/
lambda.rs

1//! AWS Lambda instrumentation utilities.
2//!
3//! This module provides instrumentation layer for AWS Lambda functions.
4
5use crate::{
6    future::{InstrumentedFuture, InstrumentedFutureContext},
7    semconv,
8};
9use lambda_runtime::LambdaInvocation;
10use opentelemetry::trace::SpanKind;
11use opentelemetry_sdk::trace::SdkTracerProvider as TracerProvider;
12use std::task::{Context as TaskContext, Poll};
13use tower::{Layer, Service};
14use tracing::{Instrument, instrument::Instrumented};
15use tracing_opentelemetry_instrumentation_sdk::TRACING_TARGET;
16
17/// OpenTelemetry layer for AWS Lambda functions.
18///
19/// This layer provides automatic tracing instrumentation for AWS Lambda functions,
20/// creating spans for each invocation with appropriate FaaS semantic attributes.
21///
22/// # Example
23///
24/// ```rust,no_run
25/// use lambda_runtime::{
26///     Error as LambdaRuntimeError, Error as LambdaError, LambdaEvent, Runtime,
27///     service_fn,
28/// };
29/// use telemetry_rust::{init_tracing, middleware::lambda::OtelLambdaLayer};
30///
31/// #[tracing::instrument(skip_all, err, fields(req_id = %event.context.request_id))]
32/// pub async fn handle(event: LambdaEvent<()>) -> Result<String, LambdaError> {
33///     Ok(String::from("Hello!"))
34/// }
35///
36/// #[tokio::main]
37/// async fn main() -> Result<(), lambda_runtime::Error> {
38///     // Grab TracerProvider after telemetry initialisation
39///     let provider = init_tracing!(tracing::Level::WARN);
40///
41///     // Create lambda telemetry layer
42///     let telemetry_layer = OtelLambdaLayer::new(provider);
43///
44///     // Run lambda runtime with telemetry layer
45///     Runtime::new(service_fn(handle))
46///         .layer(telemetry_layer)
47///         .run()
48///         .await?;
49///
50///     // Tracer provider will be automatically shutdown when the runtime is dropped
51///
52///     Ok(())
53/// }
54/// ```
55pub struct OtelLambdaLayer {
56    provider: TracerProvider,
57}
58
59impl OtelLambdaLayer {
60    /// Creates a new OpenTelemetry layer for Lambda functions.
61    ///
62    /// # Arguments
63    ///
64    /// * `provider` - The tracer provider to use for creating spans
65    pub fn new(provider: TracerProvider) -> Self {
66        Self { provider }
67    }
68}
69
70impl<S> Layer<S> for OtelLambdaLayer {
71    type Service = OtelLambdaService<S>;
72
73    fn layer(&self, inner: S) -> Self::Service {
74        OtelLambdaService {
75            inner,
76            provider: self.provider.clone(),
77            coldstart: true,
78        }
79    }
80}
81
82impl<T> InstrumentedFutureContext<T> for TracerProvider {
83    fn on_result(self, _: &T) {
84        if let Err(err) = self.force_flush() {
85            tracing::warn!("failed to flush tracer provider: {err:?}");
86        }
87    }
88}
89
90/// OpenTelemetry service wrapper for AWS Lambda functions.
91///
92/// This service wraps Lambda services to provide automatic invocation tracing
93/// with proper span lifecycle management and cold start detection.
94pub struct OtelLambdaService<S> {
95    inner: S,
96    provider: TracerProvider,
97    coldstart: bool,
98}
99
100impl<S> Drop for OtelLambdaService<S> {
101    fn drop(&mut self) {
102        crate::shutdown_tracer_provider(&self.provider)
103    }
104}
105
106impl<S, R> Service<LambdaInvocation> for OtelLambdaService<S>
107where
108    S: Service<LambdaInvocation, Response = R>,
109{
110    type Response = R;
111    type Error = S::Error;
112    type Future = InstrumentedFuture<Instrumented<S::Future>, TracerProvider>;
113
114    fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
115        self.inner.poll_ready(cx)
116    }
117
118    fn call(&mut self, req: LambdaInvocation) -> Self::Future {
119        let span = tracing::trace_span!(
120            target: TRACING_TARGET,
121            "Lambda function invocation",
122            // TODO: set correct otel.kind and faas.trigger
123            // see https://opentelemetry.io/docs/specs/semconv/faas/aws-lambda/
124            "otel.kind" = ?SpanKind::Server,
125            "otel.name" = req.context.env_config.function_name,
126            { semconv::FAAS_TRIGGER } = "other",
127            { semconv::AWS_LAMBDA_INVOKED_ARN } = req.context.invoked_function_arn,
128            { semconv::FAAS_INVOCATION_ID } = req.context.request_id,
129            { semconv::FAAS_COLDSTART } = self.coldstart,
130        );
131
132        self.coldstart = false;
133
134        let future = self.inner.call(req).instrument(span);
135        InstrumentedFuture::new(future, self.provider.clone())
136    }
137}