opentelemetry_lambda_tower/
service.rs1use crate::cold_start::check_cold_start;
4use crate::extractor::TraceContextExtractor;
5use crate::future::OtelTracingFuture;
6use lambda_runtime::LambdaEvent;
7use opentelemetry_sdk::logs::SdkLoggerProvider;
8use opentelemetry_sdk::trace::SdkTracerProvider;
9use opentelemetry_semantic_conventions::attribute::{
10 CLOUD_ACCOUNT_ID, CLOUD_PROVIDER, CLOUD_REGION, FAAS_MAX_MEMORY, FAAS_NAME, FAAS_VERSION,
11};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tower::Service;
16use tracing::Span;
17use tracing_opentelemetry::OpenTelemetrySpanExt;
18
19#[derive(Clone)]
32pub struct OtelTracingService<S, E> {
33 inner: S,
34 extractor: E,
35 tracer_provider: Option<Arc<SdkTracerProvider>>,
36 logger_provider: Option<Arc<SdkLoggerProvider>>,
37 flush_on_end: bool,
38 flush_timeout: Duration,
39}
40
41impl<S, E> OtelTracingService<S, E> {
42 pub(crate) fn new(
44 inner: S,
45 extractor: E,
46 tracer_provider: Option<Arc<SdkTracerProvider>>,
47 logger_provider: Option<Arc<SdkLoggerProvider>>,
48 flush_on_end: bool,
49 flush_timeout: Duration,
50 ) -> Self {
51 Self {
52 inner,
53 extractor,
54 tracer_provider,
55 logger_provider,
56 flush_on_end,
57 flush_timeout,
58 }
59 }
60}
61
62impl<S, E, T> Service<LambdaEvent<T>> for OtelTracingService<S, E>
63where
64 S: Service<LambdaEvent<T>>,
65 S::Error: std::fmt::Display,
66 E: TraceContextExtractor<T>,
67 T: Send + 'static,
68{
69 type Response = S::Response;
70 type Error = S::Error;
71 type Future = OtelTracingFuture<S::Future, S::Response, S::Error>;
72
73 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
74 self.inner.poll_ready(cx)
75 }
76
77 fn call(&mut self, event: LambdaEvent<T>) -> Self::Future {
78 let (payload, lambda_ctx) = event.into_parts();
79
80 let parent_context = self.extractor.extract_context(&payload);
82
83 let links = self.extractor.extract_links(&payload);
85
86 let is_cold_start = check_cold_start();
88
89 let span_name = self.extractor.span_name(&payload, &lambda_ctx);
91
92 let span = tracing::info_span!(
94 "lambda.invoke",
95 otel.name = %span_name,
96 otel.kind = "server",
97 faas.trigger = %self.extractor.trigger_type(),
98 faas.invocation_id = %lambda_ctx.request_id,
99 faas.coldstart = is_cold_start,
100 );
101
102 let _ = span.set_parent(parent_context);
104
105 for link in links {
107 span.add_link(link.span_context.clone());
108 }
109
110 self.extractor.record_attributes(&payload, &span);
112
113 record_lambda_context_attributes(&span, &lambda_ctx);
115
116 let event = LambdaEvent::new(payload, lambda_ctx);
118
119 let future = {
124 let _guard = span.enter();
125 self.inner.call(event)
126 };
127
128 OtelTracingFuture::new(
129 future,
130 span,
131 self.tracer_provider.clone(),
132 self.logger_provider.clone(),
133 self.flush_on_end,
134 self.flush_timeout,
135 )
136 }
137}
138
139fn record_lambda_context_attributes(span: &Span, ctx: &lambda_runtime::Context) {
141 span.record(CLOUD_PROVIDER, "aws");
142 span.record(FAAS_NAME, ctx.env_config.function_name.as_str());
143 span.record(FAAS_VERSION, ctx.env_config.version.as_str());
144
145 let memory_bytes = ctx.env_config.memory as i64 * 1024 * 1024;
146 span.record(FAAS_MAX_MEMORY, memory_bytes);
147
148 if let Ok(region) = std::env::var("AWS_REGION") {
149 span.record(CLOUD_REGION, region.as_str());
150 }
151
152 span.record("aws.lambda.invoked_arn", ctx.invoked_function_arn.as_str());
153
154 if let Some(account_id) = ctx.invoked_function_arn.split(':').nth(4) {
155 span.record(CLOUD_ACCOUNT_ID, account_id);
156 }
157}