use aws_lambda_events::{
alb::AlbTargetGroupRequest,
apigw::{ApiGatewayProxyRequest, ApiGatewayV2httpRequest, ApiGatewayWebsocketProxyRequest},
http::HeaderMap,
};
use lambda_runtime::tracing::Span;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::RwLock;
use tracing_opentelemetry::OpenTelemetrySpanExt;
lazy_static! {
static ref COMMON_HEADERS: Vec<&'static str> = vec![
"accept",
"accept-encoding",
"accept-language",
"cache-control",
"content-encoding",
"content-language",
"content-type",
"etag",
"forwarded",
"if-match",
"if-modified-since",
"if-none-match",
"if-unmodified-since",
"last-modified",
"location",
"origin",
"range",
"vary",
"x-forwarded-for",
"x-forwarded-host",
"x-forwarded-proto",
];
static ref CAPTURED_REQUEST_HEADERS: RwLock<HashSet<String>> = {
let mut headers = HashSet::new();
headers.extend(COMMON_HEADERS.iter().map(|&h| h.to_string()));
RwLock::new(headers)
};
static ref CAPTURED_RESPONSE_HEADERS: RwLock<HashSet<String>> = {
let mut headers = HashSet::new();
headers.extend(COMMON_HEADERS.iter().map(|&h| h.to_string()));
RwLock::new(headers)
};
}
#[allow(dead_code)]
pub fn configure_captured_request_headers(headers: &[&str]) {
let mut captured = CAPTURED_REQUEST_HEADERS.write().unwrap();
captured.clear();
captured.extend(headers.iter().map(|&h| h.to_lowercase()));
}
#[allow(dead_code)]
pub fn configure_captured_response_headers(headers: &[&str]) {
let mut captured = CAPTURED_RESPONSE_HEADERS.write().unwrap();
captured.clear();
captured.extend(headers.iter().map(|&h| h.to_lowercase()));
}
pub trait RoutableHttpEvent: Send + Sync + Clone + 'static {
fn path(&self) -> Option<String>;
fn http_method(&self) -> String;
fn route(&self) -> Option<String> {
None
}
fn path_parameters(&self) -> Option<&HashMap<String, String>> {
None
}
fn url_query(&self) -> Option<String> {
None
}
fn client_address(&self) -> Option<String> {
None
}
fn http_headers(&self) -> Option<&HeaderMap> {
None
}
fn response_headers(&self) -> Option<&HeaderMap> {
None
}
fn user_agent(&self) -> Option<String> {
self.http_headers()
.and_then(|h| h.get("user-agent"))
.and_then(|v| v.to_str().ok())
.map(String::from)
}
fn url_scheme(&self) -> String {
"https".to_string() }
fn server_address(&self) -> Option<String> {
self.http_headers()
.and_then(|headers| headers.get("host"))
.and_then(|v| v.to_str().ok())
.map(|host| host.split(':').next().unwrap_or(host).to_string())
}
fn server_port(&self) -> Option<u16> {
Some(443) }
fn set_otel_http_attributes(
&self,
span: &Span,
route_pattern: &str,
lambda_context: &lambda_runtime::Context,
) {
let span_name = format!("{} {}", self.http_method(), route_pattern);
span.record("otel.name", &span_name);
span.record("otel.kind", "SERVER");
span.set_attribute("http.request.method", self.http_method());
span.set_attribute("http.route", route_pattern.to_string());
if let Some(headers) = self.http_headers() {
let captured = CAPTURED_REQUEST_HEADERS.read().unwrap();
if !captured.is_empty() {
for name in captured.iter() {
let values = headers.get_all(name).iter().collect::<Vec<_>>();
if !values.is_empty() {
let header_values: Vec<String> = values
.iter()
.filter_map(|v| v.to_str().ok())
.map(String::from)
.collect();
span.set_attribute(
format!("http.request.header.{name}"),
header_values.join(","),
);
}
}
}
}
if let Some(headers) = self.response_headers() {
let captured = CAPTURED_RESPONSE_HEADERS.read().unwrap();
if !captured.is_empty() {
for name in captured.iter() {
let values = headers.get_all(name).iter().collect::<Vec<_>>();
if !values.is_empty() {
let header_values: Vec<String> = values
.iter()
.filter_map(|v| v.to_str().ok())
.map(String::from)
.collect();
span.set_attribute(
format!("http.response.header.{name}"),
header_values.join(","),
);
}
}
}
}
span.set_attribute("url.path", self.path().unwrap_or_else(|| "/".to_string()));
span.set_attribute("url.scheme", self.url_scheme());
if let Some(query) = self.url_query() {
span.set_attribute("url.query", query);
}
if let Some(addr) = self.server_address() {
span.set_attribute("server.address", addr);
}
if let Some(port) = self.server_port() {
span.set_attribute("server.port", port as i64);
}
if let Some(addr) = self.client_address() {
span.set_attribute("client.address", addr);
}
if let Some(agent) = self.user_agent() {
span.set_attribute("user_agent.original", agent);
}
span.set_attribute("network.protocol.name", "http");
span.set_attribute("network.protocol.version", "1.1");
span.set_attribute("faas.invocation_id", lambda_context.request_id.to_string());
if let Some(account_id) = lambda_context.invoked_function_arn.split(':').nth(4) {
span.set_attribute("cloud.account.id", account_id.to_string());
}
span.set_attribute(
"aws.lambda.invoked_arn",
lambda_context.invoked_function_arn.to_string(),
);
}
}
impl RoutableHttpEvent for ApiGatewayV2httpRequest {
fn path(&self) -> Option<String> {
self.raw_path.clone()
}
fn http_method(&self) -> String {
self.request_context.http.method.to_string()
}
fn url_query(&self) -> Option<String> {
self.raw_query_string.clone()
}
fn client_address(&self) -> Option<String> {
self.request_context.http.source_ip.clone()
}
fn http_headers(&self) -> Option<&HeaderMap> {
Some(&self.headers)
}
}
impl RoutableHttpEvent for ApiGatewayProxyRequest {
fn path(&self) -> Option<String> {
self.path.clone()
}
fn http_method(&self) -> String {
self.http_method.to_string()
}
fn route(&self) -> Option<String> {
self.resource.clone()
}
fn path_parameters(&self) -> Option<&HashMap<String, String>> {
Some(&self.path_parameters)
}
fn url_query(&self) -> Option<String> {
if self.query_string_parameters.is_empty() {
None
} else {
Some(
self.query_string_parameters
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("&"),
)
}
}
fn client_address(&self) -> Option<String> {
self.request_context.identity.source_ip.clone()
}
fn http_headers(&self) -> Option<&HeaderMap> {
Some(&self.headers)
}
}
impl RoutableHttpEvent for AlbTargetGroupRequest {
fn path(&self) -> Option<String> {
self.path.clone()
}
fn http_method(&self) -> String {
self.http_method.to_string()
}
fn url_query(&self) -> Option<String> {
if self.query_string_parameters.is_empty() {
None
} else {
Some(
self.query_string_parameters
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("&"),
)
}
}
fn client_address(&self) -> Option<String> {
self.headers
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.map(|ips| {
ips.split(',')
.next() .unwrap_or("")
.trim()
.to_string()
})
}
fn http_headers(&self) -> Option<&HeaderMap> {
Some(&self.headers)
}
}
impl RoutableHttpEvent for ApiGatewayWebsocketProxyRequest {
fn path(&self) -> Option<String> {
self.path.clone()
}
fn http_method(&self) -> String {
self.http_method
.clone()
.map(|m| m.to_string())
.unwrap_or_else(|| "GET".to_string())
}
fn url_query(&self) -> Option<String> {
if self.query_string_parameters.is_empty() {
None
} else {
Some(
self.query_string_parameters
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("&"),
)
}
}
fn client_address(&self) -> Option<String> {
self.request_context.identity.source_ip.clone()
}
fn http_headers(&self) -> Option<&HeaderMap> {
Some(&self.headers)
}
}