awssdk_instrumentation/lambda/layer/mod.rs
1//! Tower `Layer` and `Service` for per-invocation OTel spans in Lambda.
2//!
3//! The key types are:
4//!
5//! - [`TracingLayer`] — a Tower `Layer` that wraps the Lambda runtime
6//! service. Construct it with a flush callback and optionally configure the
7//! [`OTelFaasTrigger`] attribute.
8//! - [`TracingService`] — the Tower `Service` produced by [`TracingLayer`]. You
9//! rarely need to name this type directly.
10//! - [`FlushedFuture`] — the future returned by [`TracingService`]. It calls
11//! the flush callback in its `Drop` impl, ensuring the exporter is flushed
12//! even when the invocation future is cancelled.
13//! - [`Instrumentor`] — the backend-specific trait that creates and manages
14//! the invocation span. [`TracingInstrumentor`] (tracing-backend) and
15//! [`OtelInstrumentor`] (otel-backend) are the two implementations.
16//! - [`OTelFaasTrigger`] — enum for the `faas.trigger` OTel attribute
17//! (`Http`, `PubSub`, `Timer`, `Datasource`, `Other`).
18//!
19//! [`DefaultTracingLayer`] is a type alias for [`TracingLayer`] with the
20//! default backend instrumentor, which is the most convenient way to construct
21//! the layer.
22//!
23//! ## Per-invocation span attributes
24//!
25//! The layer automatically sets the following OTel attributes on each
26//! invocation span:
27//!
28//! - `faas.trigger` — from [`OTelFaasTrigger`] (default: `Datasource`)
29//! - `faas.invocation_id` — Lambda request ID
30//! - `faas.coldstart` — `true` for the first invocation
31//! - `cloud.account.id` — extracted from the invoked function ARN
32//! - `cloud.resource_id` — the invoked function ARN
33//!
34//! When the `_X_AMZN_TRACE_ID` header is present, the X-Ray trace context is
35//! propagated into the span.
36
37mod utils;
38
39#[cfg(feature = "tracing-backend")]
40mod tracing;
41#[cfg(feature = "tracing-backend")]
42pub use tracing::TracingInstrumentor;
43
44#[cfg(feature = "otel-backend")]
45mod otel;
46#[cfg(feature = "otel-backend")]
47pub use otel::OtelInstrumentor;
48
49pub use utils::OTelFaasTrigger;
50
51use std::{marker::PhantomData, mem::ManuallyDrop, pin::Pin, task};
52use tokio::task::JoinHandle;
53
54use lambda_runtime::{
55 LambdaInvocation, Service,
56 tower::{BoxError, Layer},
57};
58use pin_project::{pin_project, pinned_drop};
59
60use utils::XRayTraceHeader;
61
62use crate::span_write::SpanWrite;
63
64// Tower Layer for Lambda invocations — creates a span per invocation,
65// extracts _X_AMZN_TRACE_ID, sets invocation attributes, flushes exporter.
66
67/// Per-invocation context passed from the Tower layer to the backend [`Instrumentor`].
68#[doc(hidden)]
69#[derive(Debug)]
70pub struct InvocationContext {
71 xray_trace_header: Option<XRayTraceHeader>,
72 function_arn: String,
73 account_id: String,
74 request_id: String,
75 trigger: OTelFaasTrigger,
76 is_coldstart: bool,
77}
78
79/// Backend-specific strategy for creating and managing per-invocation spans.
80///
81/// `Instrumentor` is implemented by [`TracingInstrumentor`] (for the
82/// `tracing-backend` feature) and [`OtelInstrumentor`] (for the `otel-backend`
83/// feature). You do not implement this trait yourself; use the
84/// [`DefaultInstrumentor`] type alias to select the active backend
85/// automatically.
86///
87/// The trait is used as a type parameter on [`TracingLayer`] and
88/// [`TracingService`] to keep the span management logic separate from the Tower
89/// middleware plumbing.
90pub trait Instrumentor {
91 /// The instrumented future type produced by [`instrument`].
92 ///
93 /// [`instrument`]: Instrumentor::instrument
94 type IFut<F: Future>: InstrumentedFuture<Fut: Future<Output = F::Output>>;
95
96 /// The span type used for the per-invocation span.
97 type InvocationSpan: SpanWrite;
98
99 /// Wraps `inner` in a backend-specific instrumented future that creates and
100 /// manages the per-invocation span described by `context`.
101 fn instrument<Fut: Future>(inner: Fut, context: InvocationContext) -> Self::IFut<Fut>;
102
103 /// Calls `f` with a mutable reference to the current invocation span.
104 ///
105 /// This is used by the interceptor to write attributes onto the invocation
106 /// span from within an async task that is a child of the invocation future.
107 fn with_invocation_span(f: impl FnOnce(&mut Self::InvocationSpan));
108
109 /// Spawns a future as a Tokio task, propagating the invocation span context.
110 fn spawn<F>(future: F) -> JoinHandle<F::Output>
111 where
112 F: Future + Send + 'static,
113 F::Output: Send + 'static;
114}
115
116/// Marker trait for futures that carry an instrumentation span.
117///
118/// Implemented by the backend-specific future types returned by
119/// [`Instrumentor::instrument`]. You do not implement this trait yourself.
120pub trait InstrumentedFuture: Future {
121 /// The concrete future type being instrumented.
122 type Fut: Future;
123}
124
125/// The default [`Instrumentor`] for the active backend.
126///
127/// Resolves to [`TracingInstrumentor`] when `tracing-backend` is enabled, or
128/// to [`OtelInstrumentor`] when only `otel-backend` is active. Use this alias
129/// as the `I` type parameter of [`TracingLayer`] to avoid hard-coding a backend.
130#[cfg(feature = "tracing-backend")]
131pub type DefaultInstrumentor = TracingInstrumentor;
132
133/// The default [`Instrumentor`] for the active backend.
134///
135/// Resolves to [`OtelInstrumentor`] when only `otel-backend` is active, or to
136/// [`TracingInstrumentor`] when `tracing-backend` is enabled. Use this alias
137/// as the `I` type parameter of [`TracingLayer`] to avoid hard-coding a backend.
138#[cfg(all(feature = "otel-backend", not(feature = "tracing-backend")))]
139pub type DefaultInstrumentor = OtelInstrumentor;
140
141/// A [`TracingLayer`] pre-configured with the default backend instrumentor.
142///
143/// This is the most convenient way to construct the Tower layer for Lambda
144/// invocation instrumentation. The `F` type parameter is the flush callback
145/// type (typically inferred).
146///
147/// # Examples
148///
149/// ```no_run
150/// use awssdk_instrumentation::lambda::layer::DefaultTracingLayer;
151///
152/// // Flush callback — called after each invocation future drops.
153/// let layer = DefaultTracingLayer::new(|| {
154/// // flush the tracer provider here
155/// });
156/// ```
157pub type DefaultTracingLayer<F> = TracingLayer<F, DefaultInstrumentor>;
158
159/// Tower [`Layer`] that wraps the Lambda runtime service with per-invocation OTel spans.
160///
161/// `TracingLayer` intercepts each [`LambdaInvocation`] and:
162///
163/// 1. Parses the `_X_AMZN_TRACE_ID` header and propagates the X-Ray trace
164/// context into the new span.
165/// 2. Creates a `SERVER`-kind span named after the Lambda function with the
166/// `faas.trigger`, `faas.invocation_id`, `faas.coldstart`,
167/// `cloud.account.id`, and `cloud.resource_id` attributes.
168/// 3. Wraps the invocation future in a [`FlushedFuture`] that calls the flush
169/// callback when the future drops, ensuring the exporter is flushed even
170/// when the invocation is cancelled.
171///
172/// Use [`DefaultTracingLayer`] to avoid specifying the `I` type parameter
173/// explicitly.
174///
175/// # Examples
176///
177/// ```no_run
178/// use awssdk_instrumentation::lambda::layer::DefaultTracingLayer;
179/// use awssdk_instrumentation::lambda::OTelFaasTrigger;
180///
181/// let layer = DefaultTracingLayer::new(|| { /* flush */ })
182/// .with_trigger(OTelFaasTrigger::Datasource);
183/// ```
184///
185/// [`Layer`]: lambda_runtime::tower::Layer
186/// [`LambdaInvocation`]: lambda_runtime::LambdaInvocation
187pub struct TracingLayer<F: Fn() + Clone, I: Instrumentor> {
188 flush_fn: F,
189 trigger: OTelFaasTrigger,
190 _phantom: PhantomData<I>,
191}
192
193impl<F: Fn() + Clone, I: Instrumentor> TracingLayer<F, I> {
194 /// Creates a new `TracingLayer` with the given flush callback.
195 ///
196 /// The `flush_fn` is called synchronously in the `Drop` impl of
197 /// [`FlushedFuture`] after each invocation completes or is cancelled. Use
198 /// it to call `tracer_provider.force_flush()`.
199 ///
200 /// The `faas.trigger` attribute defaults to [`OTelFaasTrigger::Datasource`].
201 /// Call [`with_trigger`] to override it.
202 ///
203 /// # Examples
204 ///
205 /// ```no_run
206 /// use awssdk_instrumentation::lambda::layer::DefaultTracingLayer;
207 ///
208 /// let layer = DefaultTracingLayer::new(|| { /* flush */ });
209 /// ```
210 ///
211 /// [`with_trigger`]: TracingLayer::with_trigger
212 pub fn new(flush_fn: F) -> Self {
213 Self {
214 flush_fn,
215 trigger: OTelFaasTrigger::default(),
216 _phantom: PhantomData,
217 }
218 }
219
220 /// Sets the `faas.trigger` OTel attribute for every invocation span.
221 ///
222 /// # Examples
223 ///
224 /// ```no_run
225 /// use awssdk_instrumentation::lambda::layer::DefaultTracingLayer;
226 /// use awssdk_instrumentation::lambda::OTelFaasTrigger;
227 ///
228 /// let layer = DefaultTracingLayer::new(|| { /* flush */ })
229 /// .with_trigger(OTelFaasTrigger::Http);
230 /// ```
231 pub fn with_trigger(self, trigger: OTelFaasTrigger) -> Self {
232 Self { trigger, ..self }
233 }
234}
235
236/// Wraps the Lambda runtime service `S` with per-invocation OTel instrumentation.
237impl<S, F: Fn() + Clone, I: Instrumentor> Layer<S> for TracingLayer<F, I> {
238 type Service = TracingService<I, S, F>;
239
240 fn layer(&self, inner: S) -> Self::Service {
241 TracingService {
242 inner,
243 flush_fn: self.flush_fn.clone(),
244 coldstart: true,
245 trigger: self.trigger,
246 account_id: None,
247 _phantom: PhantomData,
248 }
249 }
250}
251
252/// Tower [`Service`] produced by [`TracingLayer`].
253///
254/// You rarely need to name this type directly. It is returned by
255/// [`TracingLayer::layer`] and implements [`Service<LambdaInvocation>`].
256///
257/// Each call to [`Service::call`] creates a per-invocation OTel span, wraps
258/// the inner service's future in a [`FlushedFuture`], and tracks whether the
259/// invocation is a cold start.
260///
261/// [`Service`]: lambda_runtime::Service
262/// [`Service::call`]: lambda_runtime::Service::call
263pub struct TracingService<I: Instrumentor, S, F> {
264 inner: S,
265 flush_fn: F,
266 coldstart: bool,
267 trigger: OTelFaasTrigger,
268 account_id: Option<String>,
269 _phantom: PhantomData<I>,
270}
271/// Implements [`Service<LambdaInvocation>`] for [`TracingService`].
272///
273/// Each call parses the X-Ray trace header, builds an [`InvocationContext`],
274/// instruments the inner service's future, and wraps it in a [`FlushedFuture`].
275impl<I, S, F: Fn() + Clone> Service<LambdaInvocation> for TracingService<I, S, F>
276where
277 S: Service<LambdaInvocation, Response = (), Error = BoxError>,
278 <I as Instrumentor>::IFut<<S as Service<LambdaInvocation>>::Future>:
279 Future<Output = <<S as Service<LambdaInvocation>>::Future as Future>::Output>,
280 I: Instrumentor,
281{
282 type Response = ();
283 type Error = BoxError;
284 type Future =
285 FlushedFuture<<I as Instrumentor>::IFut<<S as Service<LambdaInvocation>>::Future>, F>;
286
287 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
288 self.inner.poll_ready(cx)
289 }
290
291 fn call(&mut self, req: LambdaInvocation) -> Self::Future {
292 let account_id = self
293 .account_id
294 .get_or_insert_with(|| {
295 req.context
296 .invoked_function_arn
297 .split(':')
298 .nth(4)
299 .map(|v| v.to_owned())
300 .unwrap_or_default()
301 })
302 .to_owned();
303
304 let xray_trace_header = req.context.xray_trace_id.as_ref().and_then(|trace_id| {
305 trace_id
306 .parse()
307 .map_err(|e| log::warn!("Could not parse XRayTraceHeader: {e}"))
308 .ok()
309 });
310
311 let invocation_context = InvocationContext {
312 xray_trace_header,
313 function_arn: req.context.invoked_function_arn.to_owned(),
314 account_id,
315 request_id: req.context.request_id.to_owned(),
316 trigger: self.trigger,
317 is_coldstart: self.coldstart,
318 };
319
320 // Next invocation won't be cold starts by definition
321 self.coldstart = false;
322
323 FlushedFuture {
324 future: ManuallyDrop::new(I::instrument(self.inner.call(req), invocation_context)),
325 flush_fn: self.flush_fn.clone(),
326 }
327 }
328}
329
330/// A future wrapper that calls a flush callback when it drops.
331///
332/// `FlushedFuture` is the future type returned by [`TracingService`]. It wraps
333/// the backend-specific instrumented future and calls `flush_fn` in its `Drop`
334/// impl, ensuring the OTel exporter is flushed after each Lambda invocation
335/// even when the future is cancelled before it completes.
336///
337/// You do not construct `FlushedFuture` directly; it is produced by
338/// [`TracingService::call`].
339#[pin_project(PinnedDrop)]
340pub struct FlushedFuture<Fut: InstrumentedFuture, F: Fn() + Clone> {
341 #[pin]
342 future: ManuallyDrop<Fut>,
343 flush_fn: F,
344}
345
346/// Polls the inner instrumented future, propagating its output.
347impl<Fut: InstrumentedFuture, F: Fn() + Clone> Future for FlushedFuture<Fut, F> {
348 type Output = Fut::Output;
349
350 fn poll(
351 self: std::pin::Pin<&mut Self>,
352 cx: &mut std::task::Context<'_>,
353 ) -> std::task::Poll<Self::Output> {
354 let this = self.project();
355 // SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
356 // and `inner` is valid, because `ManuallyDrop::drop` is called
357 // only inside `Drop` of the `TracingProviderFlusher`.
358 let future: Pin<&mut Fut> = unsafe { this.future.map_unchecked_mut(|fut| &mut **fut) };
359 future.poll(cx)
360 }
361}
362
363#[pinned_drop]
364impl<Fut: InstrumentedFuture, F: Fn() + Clone> PinnedDrop for FlushedFuture<Fut, F> {
365 fn drop(self: std::pin::Pin<&mut Self>) {
366 let this = self.project();
367
368 // SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
369 // different from wrapping `T` in `Option` and calling
370 // `Pin::set(&mut this.inner, None)`, except avoiding
371 // additional memory overhead.
372 // 2. `ManuallyDrop::drop()` is safe, because
373 // `PinnedDrop::drop()` is guaranteed to be called only
374 // once.
375 unsafe { ManuallyDrop::drop(this.future.get_unchecked_mut()) }
376 (*this.flush_fn)();
377 }
378}