opentelemetry_lambda_tower/
service.rs1use crate::cold_start::check_cold_start;
4use crate::extractor::TraceContextExtractor;
5use crate::future::OtelTracingFuture;
6use lambda_runtime::LambdaEvent;
7use opentelemetry_sdk::logs::SdkLoggerProvider;
8use opentelemetry_sdk::trace::SdkTracerProvider;
9use opentelemetry_semantic_conventions::attribute::{
10 CLIENT_ADDRESS, CLOUD_ACCOUNT_ID, CLOUD_PROVIDER, CLOUD_REGION, FAAS_MAX_MEMORY, FAAS_NAME,
11 FAAS_VERSION, HTTP_REQUEST_METHOD, HTTP_ROUTE, MESSAGING_BATCH_MESSAGE_COUNT,
12 MESSAGING_DESTINATION_NAME, MESSAGING_MESSAGE_ID, MESSAGING_OPERATION_TYPE, MESSAGING_SYSTEM,
13 NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, URL_PATH, URL_QUERY, URL_SCHEME, USER_AGENT_ORIGINAL,
14};
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use std::time::Duration;
18use tower::Service;
19use tracing::Span;
20use tracing_opentelemetry::OpenTelemetrySpanExt;
21
22#[derive(Clone)]
35pub struct OtelTracingService<S, E> {
36 inner: S,
37 extractor: E,
38 tracer_provider: Option<Arc<SdkTracerProvider>>,
39 logger_provider: Option<Arc<SdkLoggerProvider>>,
40 flush_on_end: bool,
41 flush_timeout: Duration,
42}
43
44impl<S, E> OtelTracingService<S, E> {
45 pub(crate) fn new(
47 inner: S,
48 extractor: E,
49 tracer_provider: Option<Arc<SdkTracerProvider>>,
50 logger_provider: Option<Arc<SdkLoggerProvider>>,
51 flush_on_end: bool,
52 flush_timeout: Duration,
53 ) -> Self {
54 Self {
55 inner,
56 extractor,
57 tracer_provider,
58 logger_provider,
59 flush_on_end,
60 flush_timeout,
61 }
62 }
63}
64
65impl<S, E, T> Service<LambdaEvent<T>> for OtelTracingService<S, E>
66where
67 S: Service<LambdaEvent<T>>,
68 S::Error: std::fmt::Display,
69 E: TraceContextExtractor<T>,
70 T: Send + 'static,
71{
72 type Response = S::Response;
73 type Error = S::Error;
74 type Future = OtelTracingFuture<S::Future, S::Response, S::Error>;
75
76 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
77 self.inner.poll_ready(cx)
78 }
79
80 fn call(&mut self, event: LambdaEvent<T>) -> Self::Future {
81 let (payload, lambda_ctx) = event.into_parts();
82
83 let parent_context = self.extractor.extract_context(&payload);
85
86 let links = self.extractor.extract_links(&payload);
88
89 let is_cold_start = check_cold_start();
91
92 let span_name = self.extractor.span_name(&payload, &lambda_ctx);
94
95 let span = tracing::info_span!(
98 "lambda.invoke",
99 otel.name = %span_name,
100 otel.kind = "server",
101 faas.trigger = %self.extractor.trigger_type(),
102 faas.invocation_id = %lambda_ctx.request_id,
103 faas.coldstart = is_cold_start,
104 { HTTP_REQUEST_METHOD } = tracing::field::Empty,
106 { URL_PATH } = tracing::field::Empty,
107 { URL_QUERY } = tracing::field::Empty,
108 { URL_SCHEME } = tracing::field::Empty,
109 { HTTP_ROUTE } = tracing::field::Empty,
110 { USER_AGENT_ORIGINAL } = tracing::field::Empty,
111 { CLIENT_ADDRESS } = tracing::field::Empty,
112 { SERVER_ADDRESS } = tracing::field::Empty,
113 { NETWORK_PROTOCOL_VERSION } = tracing::field::Empty,
114 { MESSAGING_SYSTEM } = tracing::field::Empty,
116 { MESSAGING_OPERATION_TYPE } = tracing::field::Empty,
117 { MESSAGING_DESTINATION_NAME } = tracing::field::Empty,
118 { MESSAGING_MESSAGE_ID } = tracing::field::Empty,
119 { MESSAGING_BATCH_MESSAGE_COUNT } = tracing::field::Empty,
120 { CLOUD_PROVIDER } = tracing::field::Empty,
122 { CLOUD_REGION } = tracing::field::Empty,
123 { CLOUD_ACCOUNT_ID } = tracing::field::Empty,
124 { FAAS_NAME } = tracing::field::Empty,
125 { FAAS_VERSION } = tracing::field::Empty,
126 { FAAS_MAX_MEMORY } = tracing::field::Empty,
127 aws.lambda.invoked_arn = tracing::field::Empty,
128 );
129
130 let _ = span.set_parent(parent_context);
132
133 for link in links {
135 span.add_link(link.span_context.clone());
136 }
137
138 self.extractor.record_attributes(&payload, &span);
140
141 record_lambda_context_attributes(&span, &lambda_ctx);
143
144 let event = LambdaEvent::new(payload, lambda_ctx);
146
147 let future = {
152 let _guard = span.enter();
153 self.inner.call(event)
154 };
155
156 OtelTracingFuture::new(
157 future,
158 span,
159 self.tracer_provider.clone(),
160 self.logger_provider.clone(),
161 self.flush_on_end,
162 self.flush_timeout,
163 )
164 }
165}
166
167fn record_lambda_context_attributes(span: &Span, ctx: &lambda_runtime::Context) {
169 span.record(CLOUD_PROVIDER, "aws");
170 span.record(FAAS_NAME, ctx.env_config.function_name.as_str());
171 span.record(FAAS_VERSION, ctx.env_config.version.as_str());
172
173 let memory_bytes = ctx.env_config.memory as i64 * 1024 * 1024;
174 span.record(FAAS_MAX_MEMORY, memory_bytes);
175
176 if let Ok(region) = std::env::var("AWS_REGION") {
177 span.record(CLOUD_REGION, region.as_str());
178 }
179
180 span.record("aws.lambda.invoked_arn", ctx.invoked_function_arn.as_str());
181
182 if let Some(account_id) = ctx.invoked_function_arn.split(':').nth(4) {
183 span.record(CLOUD_ACCOUNT_ID, account_id);
184 }
185}