use crate::constants::defaults;
use crate::constants::{env_vars, resource_attributes};
use opentelemetry::KeyValue;
use opentelemetry_sdk::Resource;
use std::env;
pub fn get_lambda_resource() -> Resource {
let mut attributes = Vec::new();
if let Ok(region) = env::var("AWS_REGION") {
attributes.push(KeyValue::new("cloud.provider", "aws"));
attributes.push(KeyValue::new("cloud.region", region));
}
if let Ok(function_name) = env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME) {
attributes.push(KeyValue::new("faas.name", function_name.clone()));
}
if let Ok(version) = env::var("AWS_LAMBDA_FUNCTION_VERSION") {
attributes.push(KeyValue::new("faas.version", version));
}
if let Ok(memory) = env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") {
if let Ok(memory_mb) = memory.parse::<i64>() {
let memory_bytes = memory_mb * 1024 * 1024;
attributes.push(KeyValue::new("faas.max_memory", memory_bytes));
}
}
if let Ok(log_stream) = env::var("AWS_LAMBDA_LOG_STREAM_NAME") {
attributes.push(KeyValue::new("faas.instance", log_stream));
}
let service_name = env::var(env_vars::SERVICE_NAME)
.or_else(|_| env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME))
.unwrap_or_else(|_| defaults::SERVICE_NAME.to_string());
attributes.push(KeyValue::new("service.name", service_name));
if let Ok(mode) = env::var(env_vars::PROCESSOR_MODE) {
attributes.push(KeyValue::new(resource_attributes::PROCESSOR_MODE, mode));
}
if let Ok(queue_size) = env::var(env_vars::QUEUE_SIZE) {
if let Ok(size) = queue_size.parse::<i64>() {
attributes.push(KeyValue::new(resource_attributes::QUEUE_SIZE, size));
}
}
if let Ok(compression_level) = env::var(env_vars::COMPRESSION_LEVEL) {
if let Ok(level) = compression_level.parse::<i64>() {
attributes.push(KeyValue::new(resource_attributes::COMPRESSION_LEVEL, level));
}
}
Resource::builder().with_attributes(attributes).build()
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::env;
fn cleanup_env() {
env::remove_var("AWS_REGION");
env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
env::remove_var("AWS_LAMBDA_FUNCTION_VERSION");
env::remove_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE");
env::remove_var("AWS_LAMBDA_LOG_STREAM_NAME");
env::remove_var(env_vars::SERVICE_NAME);
env::remove_var(env_vars::RESOURCE_ATTRIBUTES);
env::remove_var(env_vars::QUEUE_SIZE);
env::remove_var(env_vars::PROCESSOR_MODE);
env::remove_var(env_vars::COMPRESSION_LEVEL);
}
fn find_attr<'a>(
attrs: &'a [(&'a str, &'a opentelemetry::Value)],
key: &str,
) -> Option<&'a opentelemetry::Value> {
attrs.iter().find(|(k, _)| *k == key).map(|(_, v)| *v)
}
#[test]
#[serial]
fn test_get_lambda_resource_with_standard_env() {
cleanup_env();
env::set_var("AWS_REGION", "us-west-2");
env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "test-function");
env::set_var("AWS_LAMBDA_FUNCTION_VERSION", "$LATEST");
env::set_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "128");
env::set_var("AWS_LAMBDA_LOG_STREAM_NAME", "2024/01/01/[$LATEST]abc123");
let resource = get_lambda_resource();
let schema = resource.schema_url().unwrap_or("");
assert!(schema.is_empty());
let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
assert_eq!(
find_attr(&attrs, "cloud.provider"),
Some(&opentelemetry::Value::String("aws".into()))
);
assert_eq!(
find_attr(&attrs, "cloud.region"),
Some(&opentelemetry::Value::String("us-west-2".into()))
);
assert_eq!(
find_attr(&attrs, "faas.name"),
Some(&opentelemetry::Value::String("test-function".into()))
);
assert_eq!(
find_attr(&attrs, "faas.version"),
Some(&opentelemetry::Value::String("$LATEST".into()))
);
assert_eq!(
find_attr(&attrs, "faas.max_memory"),
Some(&opentelemetry::Value::I64(128 * 1024 * 1024))
);
assert_eq!(
find_attr(&attrs, "faas.instance"),
Some(&opentelemetry::Value::String(
"2024/01/01/[$LATEST]abc123".into()
))
);
cleanup_env();
}
#[test]
#[serial]
fn test_get_lambda_resource_with_no_env() {
cleanup_env();
let resource = get_lambda_resource();
let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
assert!(find_attr(&attrs, "cloud.provider").is_none());
assert!(find_attr(&attrs, "cloud.region").is_none());
assert!(find_attr(&attrs, "faas.name").is_none());
cleanup_env();
}
#[test]
#[serial]
fn test_get_lambda_resource_with_custom_service_name() {
cleanup_env();
env::set_var("AWS_LAMBDA_FUNCTION_NAME", "test-function");
env::set_var("OTEL_SERVICE_NAME", "custom-service");
let resource = get_lambda_resource();
let attrs: Vec<_> = resource.iter().collect();
let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
};
assert_eq!(
find_attr("service.name"),
Some(&opentelemetry::Value::String("custom-service".into()))
);
assert_eq!(
find_attr("faas.name"),
Some(&opentelemetry::Value::String("test-function".into()))
);
cleanup_env();
}
#[test]
#[serial]
fn test_get_lambda_resource_with_custom_attributes() {
cleanup_env();
env::set_var(
"OTEL_RESOURCE_ATTRIBUTES",
"custom.attr=value,deployment.stage=prod",
);
let resource = get_lambda_resource();
let attrs: Vec<_> = resource.iter().collect();
let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
};
assert_eq!(
find_attr("custom.attr"),
Some(&opentelemetry::Value::String("value".into()))
);
assert_eq!(
find_attr("deployment.stage"),
Some(&opentelemetry::Value::String("prod".into()))
);
cleanup_env();
}
#[test]
#[serial]
fn test_get_lambda_resource_with_encoded_attributes() {
cleanup_env();
env::set_var(
"OTEL_RESOURCE_ATTRIBUTES",
"custom.attr=hello%20world,tag=value%3Dtest",
);
let resource = get_lambda_resource();
let attrs: Vec<_> = resource.iter().collect();
let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
};
assert_eq!(
find_attr("custom.attr"),
Some(&opentelemetry::Value::String("hello%20world".into()))
);
assert_eq!(
find_attr("tag"),
Some(&opentelemetry::Value::String("value%3Dtest".into()))
);
cleanup_env();
}
#[test]
#[serial]
fn test_resource_attributes_only_set_when_env_vars_present() {
cleanup_env();
let resource = get_lambda_resource();
let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
assert!(find_attr(&attrs, resource_attributes::QUEUE_SIZE).is_none());
assert!(find_attr(&attrs, resource_attributes::PROCESSOR_MODE).is_none());
assert!(find_attr(&attrs, resource_attributes::COMPRESSION_LEVEL).is_none());
env::set_var(env_vars::QUEUE_SIZE, "4096");
env::set_var(env_vars::PROCESSOR_MODE, "async");
env::set_var(env_vars::COMPRESSION_LEVEL, "9");
let resource_with_env = get_lambda_resource();
let attrs_with_env: Vec<_> = resource_with_env
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect();
assert_eq!(
find_attr(&attrs_with_env, resource_attributes::QUEUE_SIZE),
Some(&opentelemetry::Value::I64(4096))
);
assert_eq!(
find_attr(&attrs_with_env, resource_attributes::PROCESSOR_MODE),
Some(&opentelemetry::Value::String("async".into()))
);
assert_eq!(
find_attr(&attrs_with_env, resource_attributes::COMPRESSION_LEVEL),
Some(&opentelemetry::Value::I64(9))
);
cleanup_env();
}
#[test]
#[serial]
fn test_resource_attributes_not_set_with_invalid_env_vars() {
cleanup_env();
env::set_var(env_vars::QUEUE_SIZE, "not_a_number");
env::set_var(env_vars::COMPRESSION_LEVEL, "high");
let resource = get_lambda_resource();
let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
assert!(find_attr(&attrs, resource_attributes::QUEUE_SIZE).is_none());
assert!(find_attr(&attrs, resource_attributes::COMPRESSION_LEVEL).is_none());
env::set_var(env_vars::PROCESSOR_MODE, "custom_mode");
let resource_with_mode = get_lambda_resource();
let attrs_with_mode: Vec<_> = resource_with_mode
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect();
assert_eq!(
find_attr(&attrs_with_mode, resource_attributes::PROCESSOR_MODE),
Some(&opentelemetry::Value::String("custom_mode".into()))
);
cleanup_env();
}
}