opentelemetry_lambda_extension/
conversion.rs

1//! Platform event to OTLP signal conversion.
2//!
3//! This module converts Lambda Telemetry API platform events into OpenTelemetry
4//! signals (metrics and spans) following semantic conventions.
5
6use crate::resource::semconv;
7use crate::telemetry::{ReportRecord, RuntimeDoneRecord, StartRecord, TelemetryEvent};
8use crate::tracing::XRayTraceHeader;
9use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
10use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
11use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value};
12use opentelemetry_proto::tonic::metrics::v1::{
13    Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, metric::Data,
14};
15use opentelemetry_proto::tonic::resource::v1::Resource;
16use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status, status};
17use opentelemetry_semantic_conventions::SCHEMA_URL;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20/// Scope name for instrumentation.
21const SCOPE_NAME: &str = "lambda-otel-extension";
22/// Scope version for instrumentation.
23const SCOPE_VERSION: &str = env!("CARGO_PKG_VERSION");
24
25/// Converts platform events to OTLP metrics.
26pub struct MetricsConverter {
27    resource: Resource,
28}
29
30impl MetricsConverter {
31    /// Creates a new metrics converter with the given resource attributes.
32    pub fn new(resource: Resource) -> Self {
33        Self { resource }
34    }
35
36    /// Creates a converter with default resource attributes.
37    pub fn with_defaults() -> Self {
38        Self::new(Resource::default())
39    }
40
41    /// Sets the resource for this converter.
42    pub fn set_resource(&mut self, resource: Resource) {
43        self.resource = resource.clone();
44    }
45
46    /// Converts a report event to OTLP metrics.
47    ///
48    /// Generates the following metrics:
49    /// - `faas.invocation.duration` - Duration of the invocation in milliseconds
50    /// - `aws.lambda.billed_duration` - Billed duration in milliseconds
51    /// - `aws.lambda.max_memory_used` - Maximum memory used in bytes
52    /// - `aws.lambda.init_duration` (cold start only) - Init duration in milliseconds
53    pub fn convert_report(&self, record: &ReportRecord, time: &str) -> ExportMetricsServiceRequest {
54        let timestamp_nanos = parse_iso8601_to_nanos(time).unwrap_or_else(current_time_nanos);
55
56        let mut metrics = vec![
57            self.create_gauge_metric(
58                "faas.invocation.duration",
59                "Duration of the function invocation",
60                "ms",
61                record.metrics.duration_ms,
62                timestamp_nanos,
63                vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
64            ),
65            self.create_gauge_metric(
66                "aws.lambda.billed_duration",
67                "Billed duration of the invocation",
68                "ms",
69                record.metrics.billed_duration_ms as f64,
70                timestamp_nanos,
71                vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
72            ),
73            self.create_gauge_metric(
74                "aws.lambda.max_memory_used",
75                "Maximum memory used during invocation",
76                "By",
77                (record.metrics.max_memory_used_mb * 1024 * 1024) as f64,
78                timestamp_nanos,
79                vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
80            ),
81        ];
82
83        // Add init_duration for cold starts
84        if let Some(init_duration) = record.metrics.init_duration_ms {
85            metrics.push(self.create_gauge_metric(
86                "aws.lambda.init_duration",
87                "Cold start initialization duration",
88                "ms",
89                init_duration,
90                timestamp_nanos,
91                vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
92            ));
93        }
94
95        // Add restore_duration for SnapStart
96        if let Some(restore_duration) = record.metrics.restore_duration_ms {
97            metrics.push(self.create_gauge_metric(
98                "aws.lambda.restore_duration",
99                "SnapStart restore duration",
100                "ms",
101                restore_duration,
102                timestamp_nanos,
103                vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
104            ));
105        }
106
107        ExportMetricsServiceRequest {
108            resource_metrics: vec![ResourceMetrics {
109                resource: Some(self.resource.clone()),
110                scope_metrics: vec![ScopeMetrics {
111                    scope: Some(
112                        opentelemetry_proto::tonic::common::v1::InstrumentationScope {
113                            name: SCOPE_NAME.to_string(),
114                            version: SCOPE_VERSION.to_string(),
115                            ..Default::default()
116                        },
117                    ),
118                    metrics,
119                    schema_url: SCHEMA_URL.to_string(),
120                }],
121                schema_url: SCHEMA_URL.to_string(),
122            }],
123        }
124    }
125
126    /// Creates a shutdown count metric.
127    ///
128    /// This metric is emitted when the extension receives a SHUTDOWN event,
129    /// indicating the Lambda environment is being terminated. The metric
130    /// includes the `faas.name` resource attribute to identify which function
131    /// is shutting down.
132    ///
133    /// # Arguments
134    ///
135    /// * `shutdown_reason` - The reason for shutdown (e.g., "spindown", "timeout", "failure")
136    pub fn create_shutdown_metric(&self, shutdown_reason: &str) -> ExportMetricsServiceRequest {
137        let timestamp_nanos = current_time_nanos();
138
139        let metric = Metric {
140            name: "extension.shutdown_count".to_string(),
141            description: "Count of extension shutdown events".to_string(),
142            unit: "{count}".to_string(),
143            data: Some(Data::Gauge(Gauge {
144                data_points: vec![NumberDataPoint {
145                    attributes: vec![kv_string("shutdown.reason", shutdown_reason)],
146                    start_time_unix_nano: timestamp_nanos,
147                    time_unix_nano: timestamp_nanos,
148                    exemplars: vec![],
149                    flags: 0,
150                    value: Some(
151                        opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(1),
152                    ),
153                }],
154            })),
155            metadata: vec![],
156        };
157
158        ExportMetricsServiceRequest {
159            resource_metrics: vec![ResourceMetrics {
160                resource: Some(self.resource.clone()),
161                scope_metrics: vec![ScopeMetrics {
162                    scope: Some(
163                        opentelemetry_proto::tonic::common::v1::InstrumentationScope {
164                            name: SCOPE_NAME.to_string(),
165                            version: SCOPE_VERSION.to_string(),
166                            ..Default::default()
167                        },
168                    ),
169                    metrics: vec![metric],
170                    schema_url: SCHEMA_URL.to_string(),
171                }],
172                schema_url: SCHEMA_URL.to_string(),
173            }],
174        }
175    }
176
177    fn create_gauge_metric(
178        &self,
179        name: &str,
180        description: &str,
181        unit: &str,
182        value: f64,
183        timestamp_nanos: u64,
184        attributes: Vec<KeyValue>,
185    ) -> Metric {
186        Metric {
187            name: name.to_string(),
188            description: description.to_string(),
189            unit: unit.to_string(),
190            data: Some(Data::Gauge(Gauge {
191                data_points: vec![NumberDataPoint {
192                    attributes,
193                    start_time_unix_nano: timestamp_nanos,
194                    time_unix_nano: timestamp_nanos,
195                    exemplars: vec![],
196                    flags: 0,
197                    value: Some(
198                        opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsDouble(
199                            value,
200                        ),
201                    ),
202                }],
203            })),
204            metadata: vec![],
205        }
206    }
207}
208
209/// Converts platform events to OTLP spans.
210pub struct SpanConverter {
211    resource: Resource,
212}
213
214impl SpanConverter {
215    /// Creates a new span converter with the given resource attributes.
216    pub fn new(resource: Resource) -> Self {
217        Self { resource }
218    }
219
220    /// Creates a converter with default resource attributes.
221    pub fn with_defaults() -> Self {
222        Self::new(Resource::default())
223    }
224
225    /// Sets the resource for this converter.
226    pub fn set_resource(&mut self, resource: Resource) {
227        self.resource = resource.clone();
228    }
229
230    /// Creates an invocation span from start and runtime_done events.
231    ///
232    /// # Arguments
233    ///
234    /// * `start` - The platform.start event
235    /// * `start_time` - ISO 8601 timestamp of start event
236    /// * `runtime_done` - The platform.runtimeDone event
237    /// * `done_time` - ISO 8601 timestamp of runtimeDone event
238    pub fn create_invocation_span(
239        &self,
240        start: &StartRecord,
241        start_time: &str,
242        runtime_done: &RuntimeDoneRecord,
243        done_time: &str,
244    ) -> ExportTraceServiceRequest {
245        let start_nanos = parse_iso8601_to_nanos(start_time).unwrap_or_else(current_time_nanos);
246        let end_nanos = parse_iso8601_to_nanos(done_time).unwrap_or_else(current_time_nanos);
247
248        // Extract trace context from X-Ray header if available
249        let (trace_id, parent_span_id) = extract_trace_context(start);
250
251        let span = Span {
252            trace_id: trace_id.unwrap_or_else(generate_trace_id),
253            span_id: generate_span_id(),
254            parent_span_id: parent_span_id.unwrap_or_default(),
255            name: "lambda.invoke".to_string(),
256            kind: opentelemetry_proto::tonic::trace::v1::span::SpanKind::Server as i32,
257            start_time_unix_nano: start_nanos,
258            end_time_unix_nano: end_nanos,
259            attributes: vec![
260                kv_string(semconv::FAAS_INVOCATION_ID, &start.request_id),
261                kv_string("faas.invocation.status", &runtime_done.status),
262            ],
263            dropped_attributes_count: 0,
264            events: vec![],
265            dropped_events_count: 0,
266            links: vec![],
267            dropped_links_count: 0,
268            status: Some(Status {
269                code: if runtime_done.status == "success" {
270                    status::StatusCode::Unset as i32
271                } else {
272                    status::StatusCode::Error as i32
273                },
274                message: if runtime_done.status != "success" {
275                    format!("Lambda invocation {}", runtime_done.status)
276                } else {
277                    String::new()
278                },
279            }),
280            flags: 0,
281            trace_state: String::new(),
282        };
283
284        ExportTraceServiceRequest {
285            resource_spans: vec![ResourceSpans {
286                resource: Some(self.resource.clone()),
287                scope_spans: vec![ScopeSpans {
288                    scope: Some(
289                        opentelemetry_proto::tonic::common::v1::InstrumentationScope {
290                            name: SCOPE_NAME.to_string(),
291                            version: SCOPE_VERSION.to_string(),
292                            ..Default::default()
293                        },
294                    ),
295                    spans: vec![span],
296                    schema_url: SCHEMA_URL.to_string(),
297                }],
298                schema_url: SCHEMA_URL.to_string(),
299            }],
300        }
301    }
302}
303
304/// Extracts trace context from a start record's tracing info.
305fn extract_trace_context(start: &StartRecord) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
306    let Some(ref tracing) = start.tracing else {
307        return (None, None);
308    };
309
310    let Some(ref value) = tracing.value else {
311        return (None, None);
312    };
313
314    let Some(xray) = XRayTraceHeader::parse(value) else {
315        return (None, None);
316    };
317
318    let Some(w3c) = xray.to_w3c() else {
319        return (None, None);
320    };
321
322    let trace_id = w3c.trace_id_bytes().map(|b| b.to_vec());
323    let span_id = w3c.span_id_bytes().map(|b| b.to_vec());
324
325    (trace_id, span_id)
326}
327
328/// Generates a random trace ID per OpenTelemetry specification.
329fn generate_trace_id() -> Vec<u8> {
330    rand::random::<[u8; 16]>().to_vec()
331}
332
333/// Generates a random span ID per OpenTelemetry specification.
334fn generate_span_id() -> Vec<u8> {
335    rand::random::<[u8; 8]>().to_vec()
336}
337
338/// Creates a string key-value pair.
339fn kv_string(key: &str, value: &str) -> KeyValue {
340    KeyValue {
341        key: key.to_string(),
342        value: Some(AnyValue {
343            value: Some(any_value::Value::StringValue(value.to_string())),
344        }),
345    }
346}
347
348/// Parses an ISO 8601 timestamp to nanoseconds since Unix epoch.
349fn parse_iso8601_to_nanos(timestamp: &str) -> Option<u64> {
350    // Simple ISO 8601 parsing (2022-10-12T00:00:00.000Z)
351    let ts = chrono::DateTime::parse_from_rfc3339(timestamp).ok()?;
352    Some(ts.timestamp_nanos_opt()? as u64)
353}
354
355/// Returns the current time in nanoseconds since Unix epoch.
356fn current_time_nanos() -> u64 {
357    SystemTime::now()
358        .duration_since(UNIX_EPOCH)
359        .map(|d| d.as_nanos() as u64)
360        .unwrap_or(0)
361}
362
363/// Batch processor for telemetry events.
364///
365/// Collects related events (start + runtimeDone + report) and converts them
366/// to OTLP signals when complete.
367pub struct TelemetryProcessor {
368    metrics_converter: MetricsConverter,
369    span_converter: SpanConverter,
370    pending_starts: std::collections::HashMap<String, (StartRecord, String)>,
371}
372
373impl TelemetryProcessor {
374    /// Creates a new telemetry processor.
375    pub fn new(resource: Resource) -> Self {
376        Self {
377            metrics_converter: MetricsConverter::new(resource.clone()),
378            span_converter: SpanConverter::new(resource),
379            pending_starts: std::collections::HashMap::new(),
380        }
381    }
382
383    /// Creates a processor with default resource.
384    pub fn with_defaults() -> Self {
385        Self::new(Resource::default())
386    }
387
388    /// Sets the resource for this processor.
389    pub fn set_resource(&mut self, resource: Resource) {
390        self.metrics_converter.set_resource(resource.clone());
391        self.span_converter.set_resource(resource);
392    }
393
394    /// Processes a batch of telemetry events.
395    ///
396    /// Returns generated OTLP signals (metrics and traces).
397    pub fn process_events(
398        &mut self,
399        events: Vec<TelemetryEvent>,
400    ) -> (
401        Vec<ExportMetricsServiceRequest>,
402        Vec<ExportTraceServiceRequest>,
403    ) {
404        let mut metrics = Vec::new();
405        let mut traces = Vec::new();
406
407        for event in events {
408            match event {
409                TelemetryEvent::Start { time, record } => {
410                    self.pending_starts
411                        .insert(record.request_id.clone(), (record, time));
412                }
413                TelemetryEvent::RuntimeDone { time, record } => {
414                    if let Some((start_record, start_time)) =
415                        self.pending_starts.remove(&record.request_id)
416                    {
417                        let trace = self.span_converter.create_invocation_span(
418                            &start_record,
419                            &start_time,
420                            &record,
421                            &time,
422                        );
423                        traces.push(trace);
424                    }
425                }
426                TelemetryEvent::Report { time, record } => {
427                    let metric = self.metrics_converter.convert_report(&record, &time);
428                    metrics.push(metric);
429                }
430                _ => {
431                    // Other events (init, fault, logs) are logged but not converted
432                    tracing::trace!(?event, "Received non-invocation telemetry event");
433                }
434            }
435        }
436
437        (metrics, traces)
438    }
439
440    /// Clears any pending start events.
441    ///
442    /// Call this during shutdown to avoid memory leaks.
443    pub fn clear_pending(&mut self) {
444        self.pending_starts.clear();
445    }
446
447    /// Returns the number of pending start events.
448    pub fn pending_count(&self) -> usize {
449        self.pending_starts.len()
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::telemetry::{ReportMetrics, TracingRecord};
457
458    fn make_start_record(request_id: &str) -> StartRecord {
459        StartRecord {
460            request_id: request_id.to_string(),
461            version: Some("$LATEST".to_string()),
462            tracing: None,
463        }
464    }
465
466    fn make_runtime_done_record(request_id: &str) -> RuntimeDoneRecord {
467        RuntimeDoneRecord {
468            request_id: request_id.to_string(),
469            status: "success".to_string(),
470            metrics: None,
471            tracing: None,
472            spans: vec![],
473        }
474    }
475
476    fn make_report_record(request_id: &str) -> ReportRecord {
477        ReportRecord {
478            request_id: request_id.to_string(),
479            status: "success".to_string(),
480            metrics: ReportMetrics {
481                duration_ms: 100.5,
482                billed_duration_ms: 200,
483                memory_size_mb: 128,
484                max_memory_used_mb: 64,
485                init_duration_ms: None,
486                restore_duration_ms: None,
487            },
488            tracing: None,
489        }
490    }
491
492    #[test]
493    fn test_convert_report_to_metrics() {
494        let converter = MetricsConverter::with_defaults();
495        let record = make_report_record("test-request-id");
496        let time = "2022-10-12T00:00:00.000Z";
497
498        let request = converter.convert_report(&record, time);
499
500        assert_eq!(request.resource_metrics.len(), 1);
501        let scope_metrics = &request.resource_metrics[0].scope_metrics;
502        assert_eq!(scope_metrics.len(), 1);
503
504        let metrics = &scope_metrics[0].metrics;
505        assert_eq!(metrics.len(), 3); // duration, billed_duration, max_memory_used
506
507        // Check metric names
508        let names: Vec<_> = metrics.iter().map(|m| m.name.as_str()).collect();
509        assert!(names.contains(&"faas.invocation.duration"));
510        assert!(names.contains(&"aws.lambda.billed_duration"));
511        assert!(names.contains(&"aws.lambda.max_memory_used"));
512    }
513
514    #[test]
515    fn test_convert_report_with_init_duration() {
516        let converter = MetricsConverter::with_defaults();
517        let mut record = make_report_record("test-request-id");
518        record.metrics.init_duration_ms = Some(500.0);
519
520        let request = converter.convert_report(&record, "2022-10-12T00:00:00.000Z");
521
522        let metrics = &request.resource_metrics[0].scope_metrics[0].metrics;
523        assert_eq!(metrics.len(), 4); // includes init_duration
524
525        let names: Vec<_> = metrics.iter().map(|m| m.name.as_str()).collect();
526        assert!(names.contains(&"aws.lambda.init_duration"));
527    }
528
529    #[test]
530    fn test_create_invocation_span() {
531        let converter = SpanConverter::with_defaults();
532        let start = make_start_record("test-request-id");
533        let done = make_runtime_done_record("test-request-id");
534
535        let request = converter.create_invocation_span(
536            &start,
537            "2022-10-12T00:00:00.000Z",
538            &done,
539            "2022-10-12T00:00:01.000Z",
540        );
541
542        assert_eq!(request.resource_spans.len(), 1);
543        let spans = &request.resource_spans[0].scope_spans[0].spans;
544        assert_eq!(spans.len(), 1);
545
546        let span = &spans[0];
547        assert_eq!(span.name, "lambda.invoke");
548        assert!(span.end_time_unix_nano > span.start_time_unix_nano);
549
550        // Verify trace ID is valid (16 bytes, not all zeros)
551        assert_eq!(span.trace_id.len(), 16);
552        assert_ne!(span.trace_id, vec![0u8; 16]);
553
554        // Verify span ID is valid (8 bytes)
555        assert_eq!(span.span_id.len(), 8);
556    }
557
558    #[test]
559    fn test_create_invocation_span_with_xray() {
560        let converter = SpanConverter::with_defaults();
561        let start = StartRecord {
562            request_id: "test-request-id".to_string(),
563            version: Some("$LATEST".to_string()),
564            tracing: Some(TracingRecord {
565                trace_type: Some("X-Amzn-Trace-Id".to_string()),
566                value: Some(
567                    "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1"
568                        .to_string(),
569                ),
570                span_id: None,
571            }),
572        };
573        let done = make_runtime_done_record("test-request-id");
574
575        let request = converter.create_invocation_span(
576            &start,
577            "2022-10-12T00:00:00.000Z",
578            &done,
579            "2022-10-12T00:00:01.000Z",
580        );
581
582        let span = &request.resource_spans[0].scope_spans[0].spans[0];
583
584        // Verify trace ID was extracted from X-Ray header
585        assert_eq!(span.trace_id.len(), 16);
586        assert_ne!(span.trace_id, vec![0u8; 16]); // Not all zeros
587
588        // Verify parent span ID was extracted
589        assert_eq!(span.parent_span_id.len(), 8);
590    }
591
592    #[test]
593    fn test_processor_collects_events() {
594        let mut processor = TelemetryProcessor::with_defaults();
595
596        let events = vec![
597            TelemetryEvent::Start {
598                time: "2022-10-12T00:00:00.000Z".to_string(),
599                record: make_start_record("request-1"),
600            },
601            TelemetryEvent::RuntimeDone {
602                time: "2022-10-12T00:00:01.000Z".to_string(),
603                record: make_runtime_done_record("request-1"),
604            },
605            TelemetryEvent::Report {
606                time: "2022-10-12T00:00:01.100Z".to_string(),
607                record: make_report_record("request-1"),
608            },
609        ];
610
611        let (metrics, traces) = processor.process_events(events);
612
613        assert_eq!(metrics.len(), 1);
614        assert_eq!(traces.len(), 1);
615        assert_eq!(processor.pending_count(), 0);
616    }
617
618    #[test]
619    fn test_processor_handles_out_of_order() {
620        let mut processor = TelemetryProcessor::with_defaults();
621
622        // Send start first
623        let events1 = vec![TelemetryEvent::Start {
624            time: "2022-10-12T00:00:00.000Z".to_string(),
625            record: make_start_record("request-1"),
626        }];
627
628        let (metrics, traces) = processor.process_events(events1);
629        assert_eq!(metrics.len(), 0);
630        assert_eq!(traces.len(), 0);
631        assert_eq!(processor.pending_count(), 1);
632
633        // Send runtime_done
634        let events2 = vec![TelemetryEvent::RuntimeDone {
635            time: "2022-10-12T00:00:01.000Z".to_string(),
636            record: make_runtime_done_record("request-1"),
637        }];
638
639        let (metrics, traces) = processor.process_events(events2);
640        assert_eq!(metrics.len(), 0);
641        assert_eq!(traces.len(), 1);
642        assert_eq!(processor.pending_count(), 0);
643    }
644
645    #[test]
646    fn test_parse_iso8601() {
647        let ts = parse_iso8601_to_nanos("2022-10-12T00:00:00.000Z");
648        assert!(ts.is_some());
649
650        let invalid = parse_iso8601_to_nanos("invalid");
651        assert!(invalid.is_none());
652    }
653
654    #[test]
655    fn test_kv_string() {
656        let kv = kv_string("key", "value");
657        assert_eq!(kv.key, "key");
658
659        match kv.value.unwrap().value.unwrap() {
660            any_value::Value::StringValue(s) => assert_eq!(s, "value"),
661            _ => panic!("Expected string value"),
662        }
663    }
664
665    #[test]
666    fn test_create_shutdown_metric() {
667        use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
668
669        let converter = MetricsConverter::with_defaults();
670        let request = converter.create_shutdown_metric("spindown");
671
672        assert_eq!(request.resource_metrics.len(), 1);
673        let scope_metrics = &request.resource_metrics[0].scope_metrics;
674        assert_eq!(scope_metrics.len(), 1);
675
676        let metrics = &scope_metrics[0].metrics;
677        assert_eq!(metrics.len(), 1);
678
679        let metric = &metrics[0];
680        assert_eq!(metric.name, "extension.shutdown_count");
681        assert_eq!(metric.unit, "{count}");
682
683        // Check the metric has a value of 1
684        if let Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge)) =
685            &metric.data
686        {
687            assert_eq!(gauge.data_points.len(), 1);
688            let data_point = &gauge.data_points[0];
689
690            // Check value is 1
691            match data_point.value {
692                Some(Value::AsInt(val)) => assert_eq!(val, 1),
693                _ => panic!("Expected integer value of 1"),
694            }
695
696            // Check shutdown reason attribute
697            let attrs: std::collections::HashMap<_, _> = data_point
698                .attributes
699                .iter()
700                .map(|kv| (kv.key.as_str(), kv.value.as_ref()))
701                .collect();
702            assert!(attrs.contains_key("shutdown.reason"));
703        } else {
704            panic!("Expected Gauge metric");
705        }
706    }
707
708    #[test]
709    fn test_shutdown_metric_different_reasons() {
710        let converter = MetricsConverter::with_defaults();
711
712        for reason in &["spindown", "timeout", "failure"] {
713            let request = converter.create_shutdown_metric(reason);
714            let metric = &request.resource_metrics[0].scope_metrics[0].metrics[0];
715
716            if let Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge)) =
717                &metric.data
718            {
719                let attr = &gauge.data_points[0].attributes[0];
720                assert_eq!(attr.key, "shutdown.reason");
721
722                if let Some(any_value::Value::StringValue(val)) =
723                    attr.value.as_ref().and_then(|v| v.value.as_ref())
724                {
725                    assert_eq!(val, *reason);
726                }
727            }
728        }
729    }
730}