1use crate::constants::defaults;
75use crate::constants::{env_vars, resource_attributes};
76use opentelemetry::KeyValue;
77use opentelemetry_sdk::Resource;
78use std::env;
79
80pub fn get_lambda_resource() -> Resource {
179 let mut attributes = Vec::new();
180
181 if let Ok(region) = env::var("AWS_REGION") {
183 attributes.push(KeyValue::new("cloud.provider", "aws"));
184 attributes.push(KeyValue::new("cloud.region", region));
185 }
186
187 if let Ok(function_name) = env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME) {
188 attributes.push(KeyValue::new("faas.name", function_name.clone()));
189 }
190
191 if let Ok(version) = env::var("AWS_LAMBDA_FUNCTION_VERSION") {
192 attributes.push(KeyValue::new("faas.version", version));
193 }
194
195 if let Ok(memory) = env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") {
196 if let Ok(memory_mb) = memory.parse::<i64>() {
197 let memory_bytes = memory_mb * 1024 * 1024;
198 attributes.push(KeyValue::new("faas.max_memory", memory_bytes));
199 }
200 }
201
202 if let Ok(log_stream) = env::var("AWS_LAMBDA_LOG_STREAM_NAME") {
203 attributes.push(KeyValue::new("faas.instance", log_stream));
204 }
205
206 let service_name = env::var(env_vars::SERVICE_NAME)
211 .or_else(|_| env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME))
212 .unwrap_or_else(|_| defaults::SERVICE_NAME.to_string());
213
214 attributes.push(KeyValue::new("service.name", service_name));
215
216 if let Ok(mode) = env::var(env_vars::PROCESSOR_MODE) {
218 attributes.push(KeyValue::new(resource_attributes::PROCESSOR_MODE, mode));
219 }
220
221 if let Ok(queue_size) = env::var(env_vars::QUEUE_SIZE) {
222 if let Ok(size) = queue_size.parse::<i64>() {
223 attributes.push(KeyValue::new(resource_attributes::QUEUE_SIZE, size));
224 }
225 }
226
227 if let Ok(batch_size) = env::var(env_vars::BATCH_SIZE) {
228 if let Ok(size) = batch_size.parse::<i64>() {
229 attributes.push(KeyValue::new(resource_attributes::BATCH_SIZE, size));
230 }
231 }
232
233 if let Ok(compression_level) = env::var(env_vars::COMPRESSION_LEVEL) {
234 if let Ok(level) = compression_level.parse::<i64>() {
235 attributes.push(KeyValue::new(resource_attributes::COMPRESSION_LEVEL, level));
236 }
237 }
238
239 Resource::builder().with_attributes(attributes).build()
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use serial_test::serial;
247 use std::env;
248
249 fn cleanup_env() {
250 env::remove_var("AWS_REGION");
251 env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
252 env::remove_var("AWS_LAMBDA_FUNCTION_VERSION");
253 env::remove_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE");
254 env::remove_var("AWS_LAMBDA_LOG_STREAM_NAME");
255 env::remove_var(env_vars::SERVICE_NAME);
256 env::remove_var(env_vars::RESOURCE_ATTRIBUTES);
257 env::remove_var(env_vars::BATCH_SIZE);
258 env::remove_var(env_vars::QUEUE_SIZE);
259 env::remove_var(env_vars::PROCESSOR_MODE);
260 env::remove_var(env_vars::COMPRESSION_LEVEL);
261 }
262
263 fn find_attr<'a>(
265 attrs: &'a [(&'a str, &'a opentelemetry::Value)],
266 key: &str,
267 ) -> Option<&'a opentelemetry::Value> {
268 attrs.iter().find(|(k, _)| *k == key).map(|(_, v)| *v)
269 }
270
271 #[test]
272 #[serial]
273 fn test_get_lambda_resource_with_standard_env() {
274 cleanup_env();
275
276 env::set_var("AWS_REGION", "us-west-2");
278 env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "test-function");
279 env::set_var("AWS_LAMBDA_FUNCTION_VERSION", "$LATEST");
280 env::set_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "128");
281 env::set_var("AWS_LAMBDA_LOG_STREAM_NAME", "2024/01/01/[$LATEST]abc123");
282
283 let resource = get_lambda_resource();
284 let schema = resource.schema_url().unwrap_or("");
285 assert!(schema.is_empty()); let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
289
290 assert_eq!(
291 find_attr(&attrs, "cloud.provider"),
292 Some(&opentelemetry::Value::String("aws".into()))
293 );
294 assert_eq!(
295 find_attr(&attrs, "cloud.region"),
296 Some(&opentelemetry::Value::String("us-west-2".into()))
297 );
298 assert_eq!(
299 find_attr(&attrs, "faas.name"),
300 Some(&opentelemetry::Value::String("test-function".into()))
301 );
302 assert_eq!(
303 find_attr(&attrs, "faas.version"),
304 Some(&opentelemetry::Value::String("$LATEST".into()))
305 );
306
307 assert_eq!(
309 find_attr(&attrs, "faas.max_memory"),
310 Some(&opentelemetry::Value::I64(128 * 1024 * 1024))
311 );
312 assert_eq!(
313 find_attr(&attrs, "faas.instance"),
314 Some(&opentelemetry::Value::String(
315 "2024/01/01/[$LATEST]abc123".into()
316 ))
317 );
318
319 cleanup_env();
320 }
321
322 #[test]
323 #[serial]
324 fn test_get_lambda_resource_with_no_env() {
325 cleanup_env();
326
327 let resource = get_lambda_resource();
328 let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
329
330 assert!(find_attr(&attrs, "cloud.provider").is_none());
332 assert!(find_attr(&attrs, "cloud.region").is_none());
333 assert!(find_attr(&attrs, "faas.name").is_none());
334
335 cleanup_env();
336 }
337
338 #[test]
339 #[serial]
340 fn test_get_lambda_resource_with_custom_service_name() {
341 cleanup_env();
342
343 env::set_var("AWS_LAMBDA_FUNCTION_NAME", "test-function");
345 env::set_var("OTEL_SERVICE_NAME", "custom-service");
346
347 let resource = get_lambda_resource();
348 let attrs: Vec<_> = resource.iter().collect();
349
350 let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
351 attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
352 };
353
354 assert_eq!(
355 find_attr("service.name"),
356 Some(&opentelemetry::Value::String("custom-service".into()))
357 );
358 assert_eq!(
359 find_attr("faas.name"),
360 Some(&opentelemetry::Value::String("test-function".into()))
361 );
362
363 cleanup_env();
364 }
365
366 #[test]
367 #[serial]
368 fn test_get_lambda_resource_with_custom_attributes() {
369 cleanup_env();
370
371 env::set_var(
373 "OTEL_RESOURCE_ATTRIBUTES",
374 "custom.attr=value,deployment.stage=prod",
375 );
376
377 let resource = get_lambda_resource();
378 let attrs: Vec<_> = resource.iter().collect();
379
380 let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
381 attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
382 };
383
384 assert_eq!(
385 find_attr("custom.attr"),
386 Some(&opentelemetry::Value::String("value".into()))
387 );
388 assert_eq!(
389 find_attr("deployment.stage"),
390 Some(&opentelemetry::Value::String("prod".into()))
391 );
392
393 cleanup_env();
394 }
395
396 #[test]
397 #[serial]
398 fn test_get_lambda_resource_with_encoded_attributes() {
399 cleanup_env();
400
401 env::set_var(
403 "OTEL_RESOURCE_ATTRIBUTES",
404 "custom.attr=hello%20world,tag=value%3Dtest",
405 );
406
407 let resource = get_lambda_resource();
408 let attrs: Vec<_> = resource.iter().collect();
409
410 let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
411 attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
412 };
413
414 assert_eq!(
415 find_attr("custom.attr"),
416 Some(&opentelemetry::Value::String("hello%20world".into()))
417 );
418 assert_eq!(
419 find_attr("tag"),
420 Some(&opentelemetry::Value::String("value%3Dtest".into()))
421 );
422
423 cleanup_env();
424 }
425
426 #[test]
427 #[serial]
428 fn test_resource_attributes_only_set_when_env_vars_present() {
429 cleanup_env();
430
431 let resource = get_lambda_resource();
433 let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
434
435 assert!(find_attr(&attrs, resource_attributes::QUEUE_SIZE).is_none());
437 assert!(find_attr(&attrs, resource_attributes::BATCH_SIZE).is_none());
438 assert!(find_attr(&attrs, resource_attributes::PROCESSOR_MODE).is_none());
439 assert!(find_attr(&attrs, resource_attributes::COMPRESSION_LEVEL).is_none());
440
441 env::set_var(env_vars::QUEUE_SIZE, "4096");
443 env::set_var(env_vars::BATCH_SIZE, "1024");
444 env::set_var(env_vars::PROCESSOR_MODE, "async");
445 env::set_var(env_vars::COMPRESSION_LEVEL, "9");
446
447 let resource_with_env = get_lambda_resource();
449 let attrs_with_env: Vec<_> = resource_with_env
450 .iter()
451 .map(|(k, v)| (k.as_str(), v))
452 .collect();
453
454 assert_eq!(
456 find_attr(&attrs_with_env, resource_attributes::QUEUE_SIZE),
457 Some(&opentelemetry::Value::I64(4096))
458 );
459 assert_eq!(
460 find_attr(&attrs_with_env, resource_attributes::BATCH_SIZE),
461 Some(&opentelemetry::Value::I64(1024))
462 );
463 assert_eq!(
464 find_attr(&attrs_with_env, resource_attributes::PROCESSOR_MODE),
465 Some(&opentelemetry::Value::String("async".into()))
466 );
467 assert_eq!(
468 find_attr(&attrs_with_env, resource_attributes::COMPRESSION_LEVEL),
469 Some(&opentelemetry::Value::I64(9))
470 );
471
472 cleanup_env();
473 }
474
475 #[test]
476 #[serial]
477 fn test_resource_attributes_not_set_with_invalid_env_vars() {
478 cleanup_env();
479
480 env::set_var(env_vars::QUEUE_SIZE, "not_a_number");
482 env::set_var(env_vars::BATCH_SIZE, "invalid");
483 env::set_var(env_vars::COMPRESSION_LEVEL, "high");
484
485 let resource = get_lambda_resource();
487 let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
488
489 assert!(find_attr(&attrs, resource_attributes::QUEUE_SIZE).is_none());
491 assert!(find_attr(&attrs, resource_attributes::BATCH_SIZE).is_none());
492 assert!(find_attr(&attrs, resource_attributes::COMPRESSION_LEVEL).is_none());
493
494 env::set_var(env_vars::PROCESSOR_MODE, "custom_mode");
496 let resource_with_mode = get_lambda_resource();
497 let attrs_with_mode: Vec<_> = resource_with_mode
498 .iter()
499 .map(|(k, v)| (k.as_str(), v))
500 .collect();
501
502 assert_eq!(
503 find_attr(&attrs_with_mode, resource_attributes::PROCESSOR_MODE),
504 Some(&opentelemetry::Value::String("custom_mode".into()))
505 );
506
507 cleanup_env();
508 }
509}