1use crate::constants::{env_vars, resource_attributes};
75use opentelemetry::KeyValue;
76use opentelemetry_sdk::Resource;
77use std::env;
78
79pub fn get_lambda_resource() -> Resource {
178 let mut attributes = Vec::new();
179
180 if let Ok(region) = env::var("AWS_REGION") {
182 attributes.push(KeyValue::new("cloud.provider", "aws"));
183 attributes.push(KeyValue::new("cloud.region", region));
184 }
185
186 if let Ok(function_name) = env::var(env_vars::AWS_LAMBDA_FUNCTION_NAME) {
187 attributes.push(KeyValue::new("faas.name", function_name.clone()));
188 }
189
190 if let Ok(version) = env::var("AWS_LAMBDA_FUNCTION_VERSION") {
191 attributes.push(KeyValue::new("faas.version", version));
192 }
193
194 if let Ok(memory) = env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") {
195 if let Ok(memory_mb) = memory.parse::<i64>() {
196 let memory_bytes = memory_mb * 1024 * 1024;
197 attributes.push(KeyValue::new("faas.max_memory", memory_bytes));
198 }
199 }
200
201 if let Ok(log_stream) = env::var("AWS_LAMBDA_LOG_STREAM_NAME") {
202 attributes.push(KeyValue::new("faas.instance", log_stream));
203 }
204
205 if let Ok(service_name) = env::var(env_vars::SERVICE_NAME) {
207 attributes.push(KeyValue::new("service.name", service_name));
208 }
209
210 if let Ok(mode) = env::var(env_vars::PROCESSOR_MODE) {
212 attributes.push(KeyValue::new(resource_attributes::PROCESSOR_MODE, mode));
213 }
214
215 if let Ok(queue_size) = env::var(env_vars::QUEUE_SIZE) {
216 if let Ok(size) = queue_size.parse::<i64>() {
217 attributes.push(KeyValue::new(resource_attributes::QUEUE_SIZE, size));
218 }
219 }
220
221 if let Ok(batch_size) = env::var(env_vars::BATCH_SIZE) {
222 if let Ok(size) = batch_size.parse::<i64>() {
223 attributes.push(KeyValue::new(resource_attributes::BATCH_SIZE, size));
224 }
225 }
226
227 if let Ok(compression_level) = env::var(env_vars::COMPRESSION_LEVEL) {
228 if let Ok(level) = compression_level.parse::<i64>() {
229 attributes.push(KeyValue::new(resource_attributes::COMPRESSION_LEVEL, level));
230 }
231 }
232
233 Resource::builder().with_attributes(attributes).build()
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use serial_test::serial;
241 use std::env;
242
243 fn cleanup_env() {
244 env::remove_var("AWS_REGION");
245 env::remove_var(env_vars::AWS_LAMBDA_FUNCTION_NAME);
246 env::remove_var("AWS_LAMBDA_FUNCTION_VERSION");
247 env::remove_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE");
248 env::remove_var("AWS_LAMBDA_LOG_STREAM_NAME");
249 env::remove_var(env_vars::SERVICE_NAME);
250 env::remove_var(env_vars::RESOURCE_ATTRIBUTES);
251 env::remove_var(env_vars::BATCH_SIZE);
252 env::remove_var(env_vars::QUEUE_SIZE);
253 env::remove_var(env_vars::PROCESSOR_MODE);
254 env::remove_var(env_vars::COMPRESSION_LEVEL);
255 }
256
257 fn find_attr<'a>(
259 attrs: &'a [(&'a str, &'a opentelemetry::Value)],
260 key: &str,
261 ) -> Option<&'a opentelemetry::Value> {
262 attrs.iter().find(|(k, _)| *k == key).map(|(_, v)| *v)
263 }
264
265 #[test]
266 #[serial]
267 fn test_get_lambda_resource_with_standard_env() {
268 cleanup_env();
269
270 env::set_var("AWS_REGION", "us-west-2");
272 env::set_var(env_vars::AWS_LAMBDA_FUNCTION_NAME, "test-function");
273 env::set_var("AWS_LAMBDA_FUNCTION_VERSION", "$LATEST");
274 env::set_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "128");
275 env::set_var("AWS_LAMBDA_LOG_STREAM_NAME", "2024/01/01/[$LATEST]abc123");
276
277 let resource = get_lambda_resource();
278 let schema = resource.schema_url().unwrap_or("");
279 assert!(schema.is_empty()); let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
283
284 assert_eq!(
285 find_attr(&attrs, "cloud.provider"),
286 Some(&opentelemetry::Value::String("aws".into()))
287 );
288 assert_eq!(
289 find_attr(&attrs, "cloud.region"),
290 Some(&opentelemetry::Value::String("us-west-2".into()))
291 );
292 assert_eq!(
293 find_attr(&attrs, "faas.name"),
294 Some(&opentelemetry::Value::String("test-function".into()))
295 );
296 assert_eq!(
297 find_attr(&attrs, "faas.version"),
298 Some(&opentelemetry::Value::String("$LATEST".into()))
299 );
300
301 assert_eq!(
303 find_attr(&attrs, "faas.max_memory"),
304 Some(&opentelemetry::Value::I64(128 * 1024 * 1024))
305 );
306 assert_eq!(
307 find_attr(&attrs, "faas.instance"),
308 Some(&opentelemetry::Value::String(
309 "2024/01/01/[$LATEST]abc123".into()
310 ))
311 );
312
313 cleanup_env();
314 }
315
316 #[test]
317 #[serial]
318 fn test_get_lambda_resource_with_no_env() {
319 cleanup_env();
320
321 let resource = get_lambda_resource();
322 let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
323
324 assert!(find_attr(&attrs, "cloud.provider").is_none());
326 assert!(find_attr(&attrs, "cloud.region").is_none());
327 assert!(find_attr(&attrs, "faas.name").is_none());
328
329 cleanup_env();
330 }
331
332 #[test]
333 #[serial]
334 fn test_get_lambda_resource_with_custom_service_name() {
335 cleanup_env();
336
337 env::set_var("AWS_LAMBDA_FUNCTION_NAME", "test-function");
339 env::set_var("OTEL_SERVICE_NAME", "custom-service");
340
341 let resource = get_lambda_resource();
342 let attrs: Vec<_> = resource.iter().collect();
343
344 let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
345 attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
346 };
347
348 assert_eq!(
349 find_attr("service.name"),
350 Some(&opentelemetry::Value::String("custom-service".into()))
351 );
352 assert_eq!(
353 find_attr("faas.name"),
354 Some(&opentelemetry::Value::String("test-function".into()))
355 );
356
357 cleanup_env();
358 }
359
360 #[test]
361 #[serial]
362 fn test_get_lambda_resource_with_custom_attributes() {
363 cleanup_env();
364
365 env::set_var(
367 "OTEL_RESOURCE_ATTRIBUTES",
368 "custom.attr=value,deployment.stage=prod",
369 );
370
371 let resource = get_lambda_resource();
372 let attrs: Vec<_> = resource.iter().collect();
373
374 let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
375 attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
376 };
377
378 assert_eq!(
379 find_attr("custom.attr"),
380 Some(&opentelemetry::Value::String("value".into()))
381 );
382 assert_eq!(
383 find_attr("deployment.stage"),
384 Some(&opentelemetry::Value::String("prod".into()))
385 );
386
387 cleanup_env();
388 }
389
390 #[test]
391 #[serial]
392 fn test_get_lambda_resource_with_encoded_attributes() {
393 cleanup_env();
394
395 env::set_var(
397 "OTEL_RESOURCE_ATTRIBUTES",
398 "custom.attr=hello%20world,tag=value%3Dtest",
399 );
400
401 let resource = get_lambda_resource();
402 let attrs: Vec<_> = resource.iter().collect();
403
404 let find_attr = |key: &str| -> Option<&opentelemetry::Value> {
405 attrs.iter().find(|kv| kv.0.as_str() == key).map(|kv| kv.1)
406 };
407
408 assert_eq!(
409 find_attr("custom.attr"),
410 Some(&opentelemetry::Value::String("hello%20world".into()))
411 );
412 assert_eq!(
413 find_attr("tag"),
414 Some(&opentelemetry::Value::String("value%3Dtest".into()))
415 );
416
417 cleanup_env();
418 }
419
420 #[test]
421 #[serial]
422 fn test_resource_attributes_only_set_when_env_vars_present() {
423 cleanup_env();
424
425 let resource = get_lambda_resource();
427 let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
428
429 assert!(find_attr(&attrs, resource_attributes::QUEUE_SIZE).is_none());
431 assert!(find_attr(&attrs, resource_attributes::BATCH_SIZE).is_none());
432 assert!(find_attr(&attrs, resource_attributes::PROCESSOR_MODE).is_none());
433 assert!(find_attr(&attrs, resource_attributes::COMPRESSION_LEVEL).is_none());
434
435 env::set_var(env_vars::QUEUE_SIZE, "4096");
437 env::set_var(env_vars::BATCH_SIZE, "1024");
438 env::set_var(env_vars::PROCESSOR_MODE, "async");
439 env::set_var(env_vars::COMPRESSION_LEVEL, "9");
440
441 let resource_with_env = get_lambda_resource();
443 let attrs_with_env: Vec<_> = resource_with_env
444 .iter()
445 .map(|(k, v)| (k.as_str(), v))
446 .collect();
447
448 assert_eq!(
450 find_attr(&attrs_with_env, resource_attributes::QUEUE_SIZE),
451 Some(&opentelemetry::Value::I64(4096))
452 );
453 assert_eq!(
454 find_attr(&attrs_with_env, resource_attributes::BATCH_SIZE),
455 Some(&opentelemetry::Value::I64(1024))
456 );
457 assert_eq!(
458 find_attr(&attrs_with_env, resource_attributes::PROCESSOR_MODE),
459 Some(&opentelemetry::Value::String("async".into()))
460 );
461 assert_eq!(
462 find_attr(&attrs_with_env, resource_attributes::COMPRESSION_LEVEL),
463 Some(&opentelemetry::Value::I64(9))
464 );
465
466 cleanup_env();
467 }
468
469 #[test]
470 #[serial]
471 fn test_resource_attributes_not_set_with_invalid_env_vars() {
472 cleanup_env();
473
474 env::set_var(env_vars::QUEUE_SIZE, "not_a_number");
476 env::set_var(env_vars::BATCH_SIZE, "invalid");
477 env::set_var(env_vars::COMPRESSION_LEVEL, "high");
478
479 let resource = get_lambda_resource();
481 let attrs: Vec<_> = resource.iter().map(|(k, v)| (k.as_str(), v)).collect();
482
483 assert!(find_attr(&attrs, resource_attributes::QUEUE_SIZE).is_none());
485 assert!(find_attr(&attrs, resource_attributes::BATCH_SIZE).is_none());
486 assert!(find_attr(&attrs, resource_attributes::COMPRESSION_LEVEL).is_none());
487
488 env::set_var(env_vars::PROCESSOR_MODE, "custom_mode");
490 let resource_with_mode = get_lambda_resource();
491 let attrs_with_mode: Vec<_> = resource_with_mode
492 .iter()
493 .map(|(k, v)| (k.as_str(), v))
494 .collect();
495
496 assert_eq!(
497 find_attr(&attrs_with_mode, resource_attributes::PROCESSOR_MODE),
498 Some(&opentelemetry::Value::String("custom_mode".into()))
499 );
500
501 cleanup_env();
502 }
503}