telemetry_rust/middleware/
lambda.rs1use crate::{
6 future::{InstrumentedFuture, InstrumentedFutureContext},
7 semconv,
8};
9use lambda_runtime::LambdaInvocation;
10use opentelemetry::trace::SpanKind;
11use opentelemetry_sdk::trace::SdkTracerProvider as TracerProvider;
12use std::task::{Context as TaskContext, Poll};
13use tower::{Layer, Service};
14use tracing::{Instrument, instrument::Instrumented};
15use tracing_opentelemetry_instrumentation_sdk::TRACING_TARGET;
16
17pub struct OtelLambdaLayer {
56 provider: TracerProvider,
57}
58
59impl OtelLambdaLayer {
60 pub fn new(provider: TracerProvider) -> Self {
66 Self { provider }
67 }
68}
69
70impl<S> Layer<S> for OtelLambdaLayer {
71 type Service = OtelLambdaService<S>;
72
73 fn layer(&self, inner: S) -> Self::Service {
74 OtelLambdaService {
75 inner,
76 provider: self.provider.clone(),
77 coldstart: true,
78 }
79 }
80}
81
82impl<T> InstrumentedFutureContext<T> for TracerProvider {
83 fn on_result(self, _: &T) {
84 if let Err(err) = self.force_flush() {
85 tracing::warn!("failed to flush tracer provider: {err:?}");
86 }
87 }
88}
89
90pub struct OtelLambdaService<S> {
95 inner: S,
96 provider: TracerProvider,
97 coldstart: bool,
98}
99
100impl<S> Drop for OtelLambdaService<S> {
101 fn drop(&mut self) {
102 crate::shutdown_tracer_provider(&self.provider)
103 }
104}
105
106impl<S, R> Service<LambdaInvocation> for OtelLambdaService<S>
107where
108 S: Service<LambdaInvocation, Response = R>,
109{
110 type Response = R;
111 type Error = S::Error;
112 type Future = InstrumentedFuture<Instrumented<S::Future>, TracerProvider>;
113
114 fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
115 self.inner.poll_ready(cx)
116 }
117
118 fn call(&mut self, req: LambdaInvocation) -> Self::Future {
119 let span = tracing::trace_span!(
120 target: TRACING_TARGET,
121 "Lambda function invocation",
122 "otel.kind" = ?SpanKind::Server,
125 "otel.name" = req.context.env_config.function_name,
126 { semconv::FAAS_TRIGGER } = "other",
127 { semconv::AWS_LAMBDA_INVOKED_ARN } = req.context.invoked_function_arn,
128 { semconv::FAAS_INVOCATION_ID } = req.context.request_id,
129 { semconv::FAAS_COLDSTART } = self.coldstart,
130 );
131
132 self.coldstart = false;
133
134 let future = self.inner.call(req).instrument(span);
135 InstrumentedFuture::new(future, self.provider.clone())
136 }
137}