lambda_otel_utils/vended/
lambda_runtime_otel.rs

1//! OpenTelemetry instrumentation layer for AWS Lambda functions.
2//!
3//! This code is based on the AWS Lambda Rust Runtime's OpenTelemetry layer:
4//! https://github.com/awslabs/aws-lambda-rust-runtime/blob/main/lambda-runtime/src/layers/otel.rs
5//!
6//! The main enhancement is the addition of OpenTelemetry span kind support, which is currently
7//! pending upstream review in PR #946:
8//! https://github.com/awslabs/aws-lambda-rust-runtime/pull/946
9//!
10//! Once the PR is merged, this implementation can be deprecated in favor of the upstream version.
11//!
12//! Licensed under the Apache License, Version 2.0
13
14use std::{fmt::Display, future::Future, pin::Pin, task};
15
16use lambda_runtime::tower::{Layer, Service};
17use lambda_runtime::LambdaInvocation;
18use opentelemetry_semantic_conventions::trace as traceconv;
19use pin_project::pin_project;
20use tracing::{field, instrument::Instrumented, Instrument};
21/// Tower layer to add OpenTelemetry tracing to a Lambda function invocation. The layer accepts
22/// a function to flush OpenTelemetry after the end of the invocation.
23pub struct OpenTelemetryLayer<F> {
24    flush_fn: F,
25    otel_attribute_trigger: OpenTelemetryFaasTrigger,
26}
27
28impl<F> OpenTelemetryLayer<F>
29where
30    F: Fn() + Clone,
31{
32    /// Create a new [OpenTelemetryLayer] with the provided flush function.
33    pub fn new(flush_fn: F) -> Self {
34        Self {
35            flush_fn,
36            otel_attribute_trigger: Default::default(),
37        }
38    }
39
40    /// Configure the `faas.trigger` attribute of the OpenTelemetry span.
41    pub fn with_trigger(self, trigger: OpenTelemetryFaasTrigger) -> Self {
42        Self {
43            otel_attribute_trigger: trigger,
44            ..self
45        }
46    }
47}
48
49impl<S, F> Layer<S> for OpenTelemetryLayer<F>
50where
51    F: Fn() + Clone,
52{
53    type Service = OpenTelemetryService<S, F>;
54
55    fn layer(&self, inner: S) -> Self::Service {
56        OpenTelemetryService {
57            inner,
58            flush_fn: self.flush_fn.clone(),
59            coldstart: true,
60            otel_attribute_trigger: self.otel_attribute_trigger.to_string(),
61        }
62    }
63}
64
65/// Tower service created by [OpenTelemetryLayer].
66pub struct OpenTelemetryService<S, F> {
67    inner: S,
68    flush_fn: F,
69    coldstart: bool,
70    otel_attribute_trigger: String,
71}
72
73impl<S, F> Service<LambdaInvocation> for OpenTelemetryService<S, F>
74where
75    S: Service<LambdaInvocation, Response = ()>,
76    F: Fn() + Clone,
77{
78    type Error = S::Error;
79    type Response = ();
80    type Future = OpenTelemetryFuture<Instrumented<S::Future>, F>;
81
82    fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
83        self.inner.poll_ready(cx)
84    }
85
86    fn call(&mut self, req: LambdaInvocation) -> Self::Future {
87        let span = tracing::info_span!(
88            "Lambda function invocation",
89            "otel.name" = req.context.env_config.function_name,
90            "otel.kind" = field::Empty,
91            { traceconv::FAAS_TRIGGER } = &self.otel_attribute_trigger,
92            { traceconv::FAAS_INVOCATION_ID } = req.context.request_id,
93            { traceconv::FAAS_COLDSTART } = self.coldstart
94        );
95
96        // After the first execution, we can set 'coldstart' to false
97        self.coldstart = false;
98
99        let future = {
100            // Enter the span before calling the inner service
101            // to ensure that it's assigned as parent of the inner spans.
102            let _guard = span.enter();
103            self.inner.call(req)
104        };
105        OpenTelemetryFuture {
106            future: Some(future.instrument(span)),
107            flush_fn: self.flush_fn.clone(),
108        }
109    }
110}
111
112/// Future created by [OpenTelemetryService].
113#[pin_project]
114pub struct OpenTelemetryFuture<Fut, F> {
115    #[pin]
116    future: Option<Fut>,
117    flush_fn: F,
118}
119
120impl<Fut, F> Future for OpenTelemetryFuture<Fut, F>
121where
122    Fut: Future,
123    F: Fn(),
124{
125    type Output = Fut::Output;
126
127    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
128        // First, try to get the ready value of the future
129        let ready = task::ready!(self
130            .as_mut()
131            .project()
132            .future
133            .as_pin_mut()
134            .expect("future polled after completion")
135            .poll(cx));
136
137        // If we got the ready value, we first drop the future: this ensures that the
138        // OpenTelemetry span attached to it is closed and included in the subsequent flush.
139        Pin::set(&mut self.as_mut().project().future, None);
140        (self.project().flush_fn)();
141        task::Poll::Ready(ready)
142    }
143}
144
145/// Represent the possible values for the OpenTelemetry `faas.trigger` attribute.
146/// See https://opentelemetry.io/docs/specs/semconv/attributes-registry/faas/ for more details.
147#[derive(Default, Clone, Copy)]
148#[non_exhaustive]
149pub enum OpenTelemetryFaasTrigger {
150    /// A response to some data source operation such as a database or filesystem read/write
151    #[default]
152    Datasource,
153    /// To provide an answer to an inbound HTTP request
154    Http,
155    /// A function is set to be executed when messages are sent to a messaging system
156    PubSub,
157    /// A function is scheduled to be executed regularly
158    Timer,
159    /// If none of the others apply
160    Other,
161}
162
163impl Display for OpenTelemetryFaasTrigger {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        match self {
166            OpenTelemetryFaasTrigger::Datasource => write!(f, "datasource"),
167            OpenTelemetryFaasTrigger::Http => write!(f, "http"),
168            OpenTelemetryFaasTrigger::PubSub => write!(f, "pubsub"),
169            OpenTelemetryFaasTrigger::Timer => write!(f, "timer"),
170            OpenTelemetryFaasTrigger::Other => write!(f, "other"),
171        }
172    }
173}