Skip to main content

awssdk_instrumentation/lambda/layer/
mod.rs

1//! Tower `Layer` and `Service` for per-invocation OTel spans in Lambda.
2//!
3//! The key types are:
4//!
5//! - [`TracingLayer`] — a Tower `Layer` that wraps the Lambda runtime
6//!   service. Construct it with a flush callback and optionally configure the
7//!   [`OTelFaasTrigger`] attribute.
8//! - [`TracingService`] — the Tower `Service` produced by [`TracingLayer`]. You
9//!   rarely need to name this type directly.
10//! - [`FlushedFuture`] — the future returned by [`TracingService`]. It calls
11//!   the flush callback in its `Drop` impl, ensuring the exporter is flushed
12//!   even when the invocation future is cancelled.
13//! - [`Instrumentor`] — the backend-specific trait that creates and manages
14//!   the invocation span. [`TracingInstrumentor`] (tracing-backend) and
15//!   [`OtelInstrumentor`] (otel-backend) are the two implementations.
16//! - [`OTelFaasTrigger`] — enum for the `faas.trigger` OTel attribute
17//!   (`Http`, `PubSub`, `Timer`, `Datasource`, `Other`).
18//!
19//! [`DefaultTracingLayer`] is a type alias for [`TracingLayer`] with the
20//! default backend instrumentor, which is the most convenient way to construct
21//! the layer.
22//!
23//! ## Per-invocation span attributes
24//!
25//! The layer automatically sets the following OTel attributes on each
26//! invocation span:
27//!
28//! - `faas.trigger` — from [`OTelFaasTrigger`] (default: `Datasource`)
29//! - `faas.invocation_id` — Lambda request ID
30//! - `faas.coldstart` — `true` for the first invocation
31//! - `cloud.account.id` — extracted from the invoked function ARN
32//! - `cloud.resource_id` — the invoked function ARN
33//!
34//! When the `_X_AMZN_TRACE_ID` header is present, the X-Ray trace context is
35//! propagated into the span.
36
37mod utils;
38
39#[cfg(feature = "tracing-backend")]
40mod tracing;
41#[cfg(feature = "tracing-backend")]
42pub use tracing::TracingInstrumentor;
43
44#[cfg(feature = "otel-backend")]
45mod otel;
46#[cfg(feature = "otel-backend")]
47pub use otel::OtelInstrumentor;
48
49pub use utils::OTelFaasTrigger;
50
51use std::{marker::PhantomData, mem::ManuallyDrop, pin::Pin, task};
52use tokio::task::JoinHandle;
53
54use lambda_runtime::{
55    LambdaInvocation, Service,
56    tower::{BoxError, Layer},
57};
58use pin_project::{pin_project, pinned_drop};
59
60use utils::XRayTraceHeader;
61
62use crate::span_write::SpanWrite;
63
64// Tower Layer for Lambda invocations — creates a span per invocation,
65// extracts _X_AMZN_TRACE_ID, sets invocation attributes, flushes exporter.
66
67/// Per-invocation context passed from the Tower layer to the backend [`Instrumentor`].
68#[doc(hidden)]
69#[derive(Debug)]
70pub struct InvocationContext {
71    xray_trace_header: Option<XRayTraceHeader>,
72    function_arn: String,
73    account_id: String,
74    request_id: String,
75    trigger: OTelFaasTrigger,
76    is_coldstart: bool,
77}
78
79/// Backend-specific strategy for creating and managing per-invocation spans.
80///
81/// `Instrumentor` is implemented by [`TracingInstrumentor`] (for the
82/// `tracing-backend` feature) and [`OtelInstrumentor`] (for the `otel-backend`
83/// feature). You do not implement this trait yourself; use the
84/// [`DefaultInstrumentor`] type alias to select the active backend
85/// automatically.
86///
87/// The trait is used as a type parameter on [`TracingLayer`] and
88/// [`TracingService`] to keep the span management logic separate from the Tower
89/// middleware plumbing.
90pub trait Instrumentor {
91    /// The instrumented future type produced by [`instrument`].
92    ///
93    /// [`instrument`]: Instrumentor::instrument
94    type IFut<F: Future>: InstrumentedFuture<Fut: Future<Output = F::Output>>;
95
96    /// The span type used for the per-invocation span.
97    type InvocationSpan: SpanWrite;
98
99    /// Wraps `inner` in a backend-specific instrumented future that creates and
100    /// manages the per-invocation span described by `context`.
101    fn instrument<Fut: Future>(inner: Fut, context: InvocationContext) -> Self::IFut<Fut>;
102
103    /// Calls `f` with a mutable reference to the current invocation span.
104    ///
105    /// This is used by the interceptor to write attributes onto the invocation
106    /// span from within an async task that is a child of the invocation future.
107    fn with_invocation_span(f: impl FnOnce(&mut Self::InvocationSpan));
108
109    /// Spawns a future as a Tokio task, propagating the invocation span context.
110    fn spawn<F>(future: F) -> JoinHandle<F::Output>
111    where
112        F: Future + Send + 'static,
113        F::Output: Send + 'static;
114}
115
116/// Marker trait for futures that carry an instrumentation span.
117///
118/// Implemented by the backend-specific future types returned by
119/// [`Instrumentor::instrument`]. You do not implement this trait yourself.
120pub trait InstrumentedFuture: Future {
121    /// The concrete future type being instrumented.
122    type Fut: Future;
123}
124
125/// The default [`Instrumentor`] for the active backend.
126///
127/// Resolves to [`TracingInstrumentor`] when `tracing-backend` is enabled, or
128/// to [`OtelInstrumentor`] when only `otel-backend` is active. Use this alias
129/// as the `I` type parameter of [`TracingLayer`] to avoid hard-coding a backend.
130#[cfg(feature = "tracing-backend")]
131pub type DefaultInstrumentor = TracingInstrumentor;
132
133/// The default [`Instrumentor`] for the active backend.
134///
135/// Resolves to [`OtelInstrumentor`] when only `otel-backend` is active, or to
136/// [`TracingInstrumentor`] when `tracing-backend` is enabled. Use this alias
137/// as the `I` type parameter of [`TracingLayer`] to avoid hard-coding a backend.
138#[cfg(all(feature = "otel-backend", not(feature = "tracing-backend")))]
139pub type DefaultInstrumentor = OtelInstrumentor;
140
141/// A [`TracingLayer`] pre-configured with the default backend instrumentor.
142///
143/// This is the most convenient way to construct the Tower layer for Lambda
144/// invocation instrumentation. The `F` type parameter is the flush callback
145/// type (typically inferred).
146///
147/// # Examples
148///
149/// ```no_run
150/// use awssdk_instrumentation::lambda::layer::DefaultTracingLayer;
151///
152/// // Flush callback — called after each invocation future drops.
153/// let layer = DefaultTracingLayer::new(|| {
154///     // flush the tracer provider here
155/// });
156/// ```
157pub type DefaultTracingLayer<F> = TracingLayer<F, DefaultInstrumentor>;
158
159/// Tower [`Layer`] that wraps the Lambda runtime service with per-invocation OTel spans.
160///
161/// `TracingLayer` intercepts each [`LambdaInvocation`] and:
162///
163/// 1. Parses the `_X_AMZN_TRACE_ID` header and propagates the X-Ray trace
164///    context into the new span.
165/// 2. Creates a `SERVER`-kind span named after the Lambda function with the
166///    `faas.trigger`, `faas.invocation_id`, `faas.coldstart`,
167///    `cloud.account.id`, and `cloud.resource_id` attributes.
168/// 3. Wraps the invocation future in a [`FlushedFuture`] that calls the flush
169///    callback when the future drops, ensuring the exporter is flushed even
170///    when the invocation is cancelled.
171///
172/// Use [`DefaultTracingLayer`] to avoid specifying the `I` type parameter
173/// explicitly.
174///
175/// # Examples
176///
177/// ```no_run
178/// use awssdk_instrumentation::lambda::layer::DefaultTracingLayer;
179/// use awssdk_instrumentation::lambda::OTelFaasTrigger;
180///
181/// let layer = DefaultTracingLayer::new(|| { /* flush */ })
182///     .with_trigger(OTelFaasTrigger::Datasource);
183/// ```
184///
185/// [`Layer`]: lambda_runtime::tower::Layer
186/// [`LambdaInvocation`]: lambda_runtime::LambdaInvocation
187pub struct TracingLayer<F: Fn() + Clone, I: Instrumentor> {
188    flush_fn: F,
189    trigger: OTelFaasTrigger,
190    _phantom: PhantomData<I>,
191}
192
193impl<F: Fn() + Clone, I: Instrumentor> TracingLayer<F, I> {
194    /// Creates a new `TracingLayer` with the given flush callback.
195    ///
196    /// The `flush_fn` is called synchronously in the `Drop` impl of
197    /// [`FlushedFuture`] after each invocation completes or is cancelled. Use
198    /// it to call `tracer_provider.force_flush()`.
199    ///
200    /// The `faas.trigger` attribute defaults to [`OTelFaasTrigger::Datasource`].
201    /// Call [`with_trigger`] to override it.
202    ///
203    /// # Examples
204    ///
205    /// ```no_run
206    /// use awssdk_instrumentation::lambda::layer::DefaultTracingLayer;
207    ///
208    /// let layer = DefaultTracingLayer::new(|| { /* flush */ });
209    /// ```
210    ///
211    /// [`with_trigger`]: TracingLayer::with_trigger
212    pub fn new(flush_fn: F) -> Self {
213        Self {
214            flush_fn,
215            trigger: OTelFaasTrigger::default(),
216            _phantom: PhantomData,
217        }
218    }
219
220    /// Sets the `faas.trigger` OTel attribute for every invocation span.
221    ///
222    /// # Examples
223    ///
224    /// ```no_run
225    /// use awssdk_instrumentation::lambda::layer::DefaultTracingLayer;
226    /// use awssdk_instrumentation::lambda::OTelFaasTrigger;
227    ///
228    /// let layer = DefaultTracingLayer::new(|| { /* flush */ })
229    ///     .with_trigger(OTelFaasTrigger::Http);
230    /// ```
231    pub fn with_trigger(self, trigger: OTelFaasTrigger) -> Self {
232        Self { trigger, ..self }
233    }
234}
235
236/// Wraps the Lambda runtime service `S` with per-invocation OTel instrumentation.
237impl<S, F: Fn() + Clone, I: Instrumentor> Layer<S> for TracingLayer<F, I> {
238    type Service = TracingService<I, S, F>;
239
240    fn layer(&self, inner: S) -> Self::Service {
241        TracingService {
242            inner,
243            flush_fn: self.flush_fn.clone(),
244            coldstart: true,
245            trigger: self.trigger,
246            account_id: None,
247            _phantom: PhantomData,
248        }
249    }
250}
251
252/// Tower [`Service`] produced by [`TracingLayer`].
253///
254/// You rarely need to name this type directly. It is returned by
255/// [`TracingLayer::layer`] and implements [`Service<LambdaInvocation>`].
256///
257/// Each call to [`Service::call`] creates a per-invocation OTel span, wraps
258/// the inner service's future in a [`FlushedFuture`], and tracks whether the
259/// invocation is a cold start.
260///
261/// [`Service`]: lambda_runtime::Service
262/// [`Service::call`]: lambda_runtime::Service::call
263pub struct TracingService<I: Instrumentor, S, F> {
264    inner: S,
265    flush_fn: F,
266    coldstart: bool,
267    trigger: OTelFaasTrigger,
268    account_id: Option<String>,
269    _phantom: PhantomData<I>,
270}
271/// Implements [`Service<LambdaInvocation>`] for [`TracingService`].
272///
273/// Each call parses the X-Ray trace header, builds an [`InvocationContext`],
274/// instruments the inner service's future, and wraps it in a [`FlushedFuture`].
275impl<I, S, F: Fn() + Clone> Service<LambdaInvocation> for TracingService<I, S, F>
276where
277    S: Service<LambdaInvocation, Response = (), Error = BoxError>,
278    <I as Instrumentor>::IFut<<S as Service<LambdaInvocation>>::Future>:
279        Future<Output = <<S as Service<LambdaInvocation>>::Future as Future>::Output>,
280    I: Instrumentor,
281{
282    type Response = ();
283    type Error = BoxError;
284    type Future =
285        FlushedFuture<<I as Instrumentor>::IFut<<S as Service<LambdaInvocation>>::Future>, F>;
286
287    fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
288        self.inner.poll_ready(cx)
289    }
290
291    fn call(&mut self, req: LambdaInvocation) -> Self::Future {
292        let account_id = self
293            .account_id
294            .get_or_insert_with(|| {
295                req.context
296                    .invoked_function_arn
297                    .split(':')
298                    .nth(4)
299                    .map(|v| v.to_owned())
300                    .unwrap_or_default()
301            })
302            .to_owned();
303
304        let xray_trace_header = req.context.xray_trace_id.as_ref().and_then(|trace_id| {
305            trace_id
306                .parse()
307                .map_err(|e| log::warn!("Could not parse XRayTraceHeader: {e}"))
308                .ok()
309        });
310
311        let invocation_context = InvocationContext {
312            xray_trace_header,
313            function_arn: req.context.invoked_function_arn.to_owned(),
314            account_id,
315            request_id: req.context.request_id.to_owned(),
316            trigger: self.trigger,
317            is_coldstart: self.coldstart,
318        };
319
320        // Next invocation won't be cold starts by definition
321        self.coldstart = false;
322
323        FlushedFuture {
324            future: ManuallyDrop::new(I::instrument(self.inner.call(req), invocation_context)),
325            flush_fn: self.flush_fn.clone(),
326        }
327    }
328}
329
330/// A future wrapper that calls a flush callback when it drops.
331///
332/// `FlushedFuture` is the future type returned by [`TracingService`]. It wraps
333/// the backend-specific instrumented future and calls `flush_fn` in its `Drop`
334/// impl, ensuring the OTel exporter is flushed after each Lambda invocation
335/// even when the future is cancelled before it completes.
336///
337/// You do not construct `FlushedFuture` directly; it is produced by
338/// [`TracingService::call`].
339#[pin_project(PinnedDrop)]
340pub struct FlushedFuture<Fut: InstrumentedFuture, F: Fn() + Clone> {
341    #[pin]
342    future: ManuallyDrop<Fut>,
343    flush_fn: F,
344}
345
346/// Polls the inner instrumented future, propagating its output.
347impl<Fut: InstrumentedFuture, F: Fn() + Clone> Future for FlushedFuture<Fut, F> {
348    type Output = Fut::Output;
349
350    fn poll(
351        self: std::pin::Pin<&mut Self>,
352        cx: &mut std::task::Context<'_>,
353    ) -> std::task::Poll<Self::Output> {
354        let this = self.project();
355        // SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
356        //         and `inner` is valid, because `ManuallyDrop::drop` is called
357        //         only inside `Drop` of the `TracingProviderFlusher`.
358        let future: Pin<&mut Fut> = unsafe { this.future.map_unchecked_mut(|fut| &mut **fut) };
359        future.poll(cx)
360    }
361}
362
363#[pinned_drop]
364impl<Fut: InstrumentedFuture, F: Fn() + Clone> PinnedDrop for FlushedFuture<Fut, F> {
365    fn drop(self: std::pin::Pin<&mut Self>) {
366        let this = self.project();
367
368        // SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
369        //             different from wrapping `T` in `Option` and calling
370        //             `Pin::set(&mut this.inner, None)`, except avoiding
371        //             additional memory overhead.
372        //         2. `ManuallyDrop::drop()` is safe, because
373        //            `PinnedDrop::drop()` is guaranteed to be called only
374        //            once.
375        unsafe { ManuallyDrop::drop(this.future.get_unchecked_mut()) }
376        (*this.flush_fn)();
377    }
378}