awssdk-instrumentation 0.2.0

Out-of-the-box OpenTelemetry/X-Ray instrumentation for the AWS SDK for Rust, with first-class support for AWS Lambda
Documentation
//! OTel-native backend [`Instrumentor`] implementation for Lambda invocation spans.

use std::{env, mem::ManuallyDrop, pin::Pin, task};

use opentelemetry::{
    Context, KeyValue,
    trace::{SpanBuilder, SpanContext, SpanKind, TraceContextExt, TraceState, Tracer},
};
use opentelemetry_semantic_conventions::attribute as semco;

use pin_project::{pin_project, pinned_drop};
use tokio::task::futures::TaskLocalFuture;

use super::{InstrumentedFuture, Instrumentor, utils::XRayTraceHeader};

/// [`Instrumentor`] implementation for the `otel-backend` feature.
///
/// `OtelInstrumentor` creates an OTel `SERVER`-kind span directly via the
/// global OTel tracer for each Lambda invocation. The span is named after the
/// Lambda function (`AWS_LAMBDA_FUNCTION_NAME`) and carries the standard FaaS
/// attributes.
///
/// The invocation OTel [`Context`] is stored in a Tokio task-local so that
/// [`Instrumentor::with_invocation_span`] can access it from child tasks.
///
/// This type is re-exported as [`DefaultInstrumentor`] when `otel-backend` is
/// the only active backend.
///
/// [`DefaultInstrumentor`]: crate::lambda::layer::DefaultInstrumentor
#[derive(Debug, Clone)]
pub struct OtelInstrumentor;

/// A pinned future that attaches an OTel [`Context`] on every poll and ends
/// the invocation span when dropped.
///
/// `OtelInstrumentedFuture` is the [`InstrumentedFuture`] type produced by
/// [`OtelInstrumentor::instrument`]. It wraps the inner future and:
///
/// - Attaches the invocation [`Context`] as the current OTel context on every
///   `poll`, so child spans created during the invocation are correctly parented.
/// - Ends the invocation span (by dropping the [`Context`]) in its `Drop` impl,
///   ensuring the span is closed even when the future is cancelled.
///
/// You do not construct this type directly.
#[pin_project(PinnedDrop)]
pub struct OtelInstrumentedFuture<Fut: Future> {
    #[pin]
    future: ManuallyDrop<Fut>,
    ctx: Context,
}

/// Polls the inner future with the invocation OTel context attached.
impl<Fut: Future> Future for OtelInstrumentedFuture<Fut> {
    type Output = Fut::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        let this = self.project();
        // SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
        //         and `future` is valid, because `ManuallyDrop::drop` is called
        //         only inside `Drop` of the `OtelInstrumentedFuture`.
        let future = unsafe { this.future.map_unchecked_mut(|v| &mut **v) };
        let _guard = this.ctx.clone().attach();
        future.poll(cx)
    }
}

#[pinned_drop]
impl<Fut: Future> PinnedDrop for OtelInstrumentedFuture<Fut> {
    fn drop(self: std::pin::Pin<&mut Self>) {
        let this = self.project();
        let _guard = this.ctx.clone().attach();

        // SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
        //             different from wrapping `T` in `Option` and calling
        //             `Pin::set(&mut this.inner, None)`, except avoiding
        //             additional memory overhead.
        //         2. `ManuallyDrop::drop()` is safe, because
        //            `PinnedDrop::drop()` is guaranteed to be called only
        //            once.
        unsafe { ManuallyDrop::drop(this.future.get_unchecked_mut()) }
    }
}

/// Marks [`OtelInstrumentedFuture`] as an instrumented future.
impl<Fut: Future> InstrumentedFuture for OtelInstrumentedFuture<Fut> {
    type Fut = Self;
}

tokio::task_local! {
    /// Task-local holding the OTel [`Context`] for the current Lambda invocation.
    static INVOCATION_CTX: Context;
}

/// Implements [`Instrumentor`] for the `otel-backend` feature.
impl Instrumentor for OtelInstrumentor {
    type IFut<F: Future> = OtelInstrumentedFuture<TaskLocalFuture<Context, F>>;
    type InvocationSpan = Context;

    fn instrument<F: Future>(inner: F, context: super::InvocationContext) -> Self::IFut<F> {
        let tracer = opentelemetry::global::tracer(env!("CARGO_PKG_NAME"));

        let span_builder = SpanBuilder::from_name(
            env::var("AWS_LAMBDA_FUNCTION_NAME")
                .ok()
                .unwrap_or_default(),
        )
        .with_kind(SpanKind::Server)
        .with_attributes([
            KeyValue::new(semco::FAAS_TRIGGER, context.trigger.to_string()),
            KeyValue::new(semco::CLOUD_RESOURCE_ID, context.function_arn),
            KeyValue::new(semco::FAAS_INVOCATION_ID, context.request_id),
            KeyValue::new(semco::CLOUD_ACCOUNT_ID, context.account_id),
            KeyValue::new(semco::FAAS_COLDSTART, context.is_coldstart),
        ]);

        // Parse XRay TraceId as an XRayTraceHeader and use it to create a parent Span
        // This will enable Xray to link this segments with the overall trace generated by the Lambda service
        let ctx = if let Some(XRayTraceHeader {
            trace_id,
            parent_id,
            sampled,
        }) = context.xray_trace_header
        {
            let otel_context = opentelemetry::Context::new().with_remote_span_context(
                SpanContext::new(trace_id, parent_id, sampled, true, TraceState::NONE),
            );
            let span = tracer.build_with_context(
                span_builder
                    .with_trace_id(trace_id)
                    .with_attributes([KeyValue::new("xray_trace_id", trace_id.to_string())]),
                &otel_context,
            );

            otel_context.with_span(span)
        } else {
            // Use a fresh, empty context as parent so the invocation span is a root span.
            // Context::current() is intentionally avoided to prevent stale state from a
            // previous invocation from being inherited.
            let span = tracer.build_with_context(span_builder, &opentelemetry::Context::new());
            opentelemetry::Context::new().with_span(span)
        };

        // Scope the task-local so with_invocation_span can find it
        let inner = INVOCATION_CTX.scope(ctx.clone(), inner);

        OtelInstrumentedFuture {
            future: ManuallyDrop::new(inner),
            ctx,
        }
    }

    fn with_invocation_span(f: impl FnOnce(&mut Self::InvocationSpan)) {
        INVOCATION_CTX.with(|ctx| {
            // ctx is the *invocation* Context, regardless of what
            // Context::current() points to right now
            let mut ctx = ctx.clone();
            f(&mut ctx);
        });
    }

    fn spawn<F>(future: F) -> tokio::task::JoinHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        let invocation_ctx = INVOCATION_CTX.with(|ctx| ctx.clone());
        tokio::spawn(INVOCATION_CTX.scope(invocation_ctx, future))
    }
}