1use crate::resource::semconv;
7use crate::telemetry::{ReportRecord, RuntimeDoneRecord, StartRecord, TelemetryEvent};
8use crate::tracing::XRayTraceHeader;
9use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
10use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
11use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value};
12use opentelemetry_proto::tonic::metrics::v1::{
13 Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, metric::Data,
14};
15use opentelemetry_proto::tonic::resource::v1::Resource;
16use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status, status};
17use opentelemetry_semantic_conventions::SCHEMA_URL;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20const SCOPE_NAME: &str = "lambda-otel-extension";
22const SCOPE_VERSION: &str = env!("CARGO_PKG_VERSION");
24
25pub struct MetricsConverter {
27 resource: Resource,
28}
29
30impl MetricsConverter {
31 pub fn new(resource: Resource) -> Self {
33 Self { resource }
34 }
35
36 pub fn with_defaults() -> Self {
38 Self::new(Resource::default())
39 }
40
41 pub fn set_resource(&mut self, resource: Resource) {
43 self.resource = resource.clone();
44 }
45
46 pub fn convert_report(&self, record: &ReportRecord, time: &str) -> ExportMetricsServiceRequest {
54 let timestamp_nanos = parse_iso8601_to_nanos(time).unwrap_or_else(current_time_nanos);
55
56 let mut metrics = vec![
57 self.create_gauge_metric(
58 "faas.invocation.duration",
59 "Duration of the function invocation",
60 "ms",
61 record.metrics.duration_ms,
62 timestamp_nanos,
63 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
64 ),
65 self.create_gauge_metric(
66 "aws.lambda.billed_duration",
67 "Billed duration of the invocation",
68 "ms",
69 record.metrics.billed_duration_ms as f64,
70 timestamp_nanos,
71 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
72 ),
73 self.create_gauge_metric(
74 "aws.lambda.max_memory_used",
75 "Maximum memory used during invocation",
76 "By",
77 (record.metrics.max_memory_used_mb * 1024 * 1024) as f64,
78 timestamp_nanos,
79 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
80 ),
81 ];
82
83 if let Some(init_duration) = record.metrics.init_duration_ms {
85 metrics.push(self.create_gauge_metric(
86 "aws.lambda.init_duration",
87 "Cold start initialization duration",
88 "ms",
89 init_duration,
90 timestamp_nanos,
91 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
92 ));
93 }
94
95 if let Some(restore_duration) = record.metrics.restore_duration_ms {
97 metrics.push(self.create_gauge_metric(
98 "aws.lambda.restore_duration",
99 "SnapStart restore duration",
100 "ms",
101 restore_duration,
102 timestamp_nanos,
103 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
104 ));
105 }
106
107 ExportMetricsServiceRequest {
108 resource_metrics: vec![ResourceMetrics {
109 resource: Some(self.resource.clone()),
110 scope_metrics: vec![ScopeMetrics {
111 scope: Some(
112 opentelemetry_proto::tonic::common::v1::InstrumentationScope {
113 name: SCOPE_NAME.to_string(),
114 version: SCOPE_VERSION.to_string(),
115 ..Default::default()
116 },
117 ),
118 metrics,
119 schema_url: SCHEMA_URL.to_string(),
120 }],
121 schema_url: SCHEMA_URL.to_string(),
122 }],
123 }
124 }
125
126 pub fn create_shutdown_metric(&self, shutdown_reason: &str) -> ExportMetricsServiceRequest {
137 let timestamp_nanos = current_time_nanos();
138
139 let metric = Metric {
140 name: "extension.shutdown_count".to_string(),
141 description: "Count of extension shutdown events".to_string(),
142 unit: "{count}".to_string(),
143 data: Some(Data::Gauge(Gauge {
144 data_points: vec![NumberDataPoint {
145 attributes: vec![kv_string("shutdown.reason", shutdown_reason)],
146 start_time_unix_nano: timestamp_nanos,
147 time_unix_nano: timestamp_nanos,
148 exemplars: vec![],
149 flags: 0,
150 value: Some(
151 opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(1),
152 ),
153 }],
154 })),
155 metadata: vec![],
156 };
157
158 ExportMetricsServiceRequest {
159 resource_metrics: vec![ResourceMetrics {
160 resource: Some(self.resource.clone()),
161 scope_metrics: vec![ScopeMetrics {
162 scope: Some(
163 opentelemetry_proto::tonic::common::v1::InstrumentationScope {
164 name: SCOPE_NAME.to_string(),
165 version: SCOPE_VERSION.to_string(),
166 ..Default::default()
167 },
168 ),
169 metrics: vec![metric],
170 schema_url: SCHEMA_URL.to_string(),
171 }],
172 schema_url: SCHEMA_URL.to_string(),
173 }],
174 }
175 }
176
177 fn create_gauge_metric(
178 &self,
179 name: &str,
180 description: &str,
181 unit: &str,
182 value: f64,
183 timestamp_nanos: u64,
184 attributes: Vec<KeyValue>,
185 ) -> Metric {
186 Metric {
187 name: name.to_string(),
188 description: description.to_string(),
189 unit: unit.to_string(),
190 data: Some(Data::Gauge(Gauge {
191 data_points: vec![NumberDataPoint {
192 attributes,
193 start_time_unix_nano: timestamp_nanos,
194 time_unix_nano: timestamp_nanos,
195 exemplars: vec![],
196 flags: 0,
197 value: Some(
198 opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsDouble(
199 value,
200 ),
201 ),
202 }],
203 })),
204 metadata: vec![],
205 }
206 }
207}
208
209pub struct SpanConverter {
211 resource: Resource,
212}
213
214impl SpanConverter {
215 pub fn new(resource: Resource) -> Self {
217 Self { resource }
218 }
219
220 pub fn with_defaults() -> Self {
222 Self::new(Resource::default())
223 }
224
225 pub fn set_resource(&mut self, resource: Resource) {
227 self.resource = resource.clone();
228 }
229
230 pub fn create_invocation_span(
239 &self,
240 start: &StartRecord,
241 start_time: &str,
242 runtime_done: &RuntimeDoneRecord,
243 done_time: &str,
244 ) -> ExportTraceServiceRequest {
245 let start_nanos = parse_iso8601_to_nanos(start_time).unwrap_or_else(current_time_nanos);
246 let end_nanos = parse_iso8601_to_nanos(done_time).unwrap_or_else(current_time_nanos);
247
248 let (trace_id, parent_span_id) = extract_trace_context(start);
250
251 let span = Span {
252 trace_id: trace_id.unwrap_or_else(generate_trace_id),
253 span_id: generate_span_id(),
254 parent_span_id: parent_span_id.unwrap_or_default(),
255 name: "lambda.invoke".to_string(),
256 kind: opentelemetry_proto::tonic::trace::v1::span::SpanKind::Server as i32,
257 start_time_unix_nano: start_nanos,
258 end_time_unix_nano: end_nanos,
259 attributes: vec![
260 kv_string(semconv::FAAS_INVOCATION_ID, &start.request_id),
261 kv_string("faas.invocation.status", &runtime_done.status),
262 ],
263 dropped_attributes_count: 0,
264 events: vec![],
265 dropped_events_count: 0,
266 links: vec![],
267 dropped_links_count: 0,
268 status: Some(Status {
269 code: if runtime_done.status == "success" {
270 status::StatusCode::Unset as i32
271 } else {
272 status::StatusCode::Error as i32
273 },
274 message: if runtime_done.status != "success" {
275 format!("Lambda invocation {}", runtime_done.status)
276 } else {
277 String::new()
278 },
279 }),
280 flags: 0,
281 trace_state: String::new(),
282 };
283
284 ExportTraceServiceRequest {
285 resource_spans: vec![ResourceSpans {
286 resource: Some(self.resource.clone()),
287 scope_spans: vec![ScopeSpans {
288 scope: Some(
289 opentelemetry_proto::tonic::common::v1::InstrumentationScope {
290 name: SCOPE_NAME.to_string(),
291 version: SCOPE_VERSION.to_string(),
292 ..Default::default()
293 },
294 ),
295 spans: vec![span],
296 schema_url: SCHEMA_URL.to_string(),
297 }],
298 schema_url: SCHEMA_URL.to_string(),
299 }],
300 }
301 }
302}
303
304fn extract_trace_context(start: &StartRecord) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
306 let Some(ref tracing) = start.tracing else {
307 return (None, None);
308 };
309
310 let Some(ref value) = tracing.value else {
311 return (None, None);
312 };
313
314 let Some(xray) = XRayTraceHeader::parse(value) else {
315 return (None, None);
316 };
317
318 let Some(w3c) = xray.to_w3c() else {
319 return (None, None);
320 };
321
322 let trace_id = w3c.trace_id_bytes().map(|b| b.to_vec());
323 let span_id = w3c.span_id_bytes().map(|b| b.to_vec());
324
325 (trace_id, span_id)
326}
327
328fn generate_trace_id() -> Vec<u8> {
330 rand::random::<[u8; 16]>().to_vec()
331}
332
333fn generate_span_id() -> Vec<u8> {
335 rand::random::<[u8; 8]>().to_vec()
336}
337
338fn kv_string(key: &str, value: &str) -> KeyValue {
340 KeyValue {
341 key: key.to_string(),
342 value: Some(AnyValue {
343 value: Some(any_value::Value::StringValue(value.to_string())),
344 }),
345 }
346}
347
348fn parse_iso8601_to_nanos(timestamp: &str) -> Option<u64> {
350 let ts = chrono::DateTime::parse_from_rfc3339(timestamp).ok()?;
352 Some(ts.timestamp_nanos_opt()? as u64)
353}
354
355fn current_time_nanos() -> u64 {
357 SystemTime::now()
358 .duration_since(UNIX_EPOCH)
359 .map(|d| d.as_nanos() as u64)
360 .unwrap_or(0)
361}
362
363pub struct TelemetryProcessor {
368 metrics_converter: MetricsConverter,
369 span_converter: SpanConverter,
370 pending_starts: std::collections::HashMap<String, (StartRecord, String)>,
371}
372
373impl TelemetryProcessor {
374 pub fn new(resource: Resource) -> Self {
376 Self {
377 metrics_converter: MetricsConverter::new(resource.clone()),
378 span_converter: SpanConverter::new(resource),
379 pending_starts: std::collections::HashMap::new(),
380 }
381 }
382
383 pub fn with_defaults() -> Self {
385 Self::new(Resource::default())
386 }
387
388 pub fn set_resource(&mut self, resource: Resource) {
390 self.metrics_converter.set_resource(resource.clone());
391 self.span_converter.set_resource(resource);
392 }
393
394 pub fn process_events(
398 &mut self,
399 events: Vec<TelemetryEvent>,
400 ) -> (
401 Vec<ExportMetricsServiceRequest>,
402 Vec<ExportTraceServiceRequest>,
403 ) {
404 let mut metrics = Vec::new();
405 let mut traces = Vec::new();
406
407 for event in events {
408 match event {
409 TelemetryEvent::Start { time, record } => {
410 self.pending_starts
411 .insert(record.request_id.clone(), (record, time));
412 }
413 TelemetryEvent::RuntimeDone { time, record } => {
414 if let Some((start_record, start_time)) =
415 self.pending_starts.remove(&record.request_id)
416 {
417 let trace = self.span_converter.create_invocation_span(
418 &start_record,
419 &start_time,
420 &record,
421 &time,
422 );
423 traces.push(trace);
424 }
425 }
426 TelemetryEvent::Report { time, record } => {
427 let metric = self.metrics_converter.convert_report(&record, &time);
428 metrics.push(metric);
429 }
430 _ => {
431 tracing::trace!(?event, "Received non-invocation telemetry event");
433 }
434 }
435 }
436
437 (metrics, traces)
438 }
439
440 pub fn clear_pending(&mut self) {
444 self.pending_starts.clear();
445 }
446
447 pub fn pending_count(&self) -> usize {
449 self.pending_starts.len()
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::telemetry::{ReportMetrics, TracingRecord};
457
458 fn make_start_record(request_id: &str) -> StartRecord {
459 StartRecord {
460 request_id: request_id.to_string(),
461 version: Some("$LATEST".to_string()),
462 tracing: None,
463 }
464 }
465
466 fn make_runtime_done_record(request_id: &str) -> RuntimeDoneRecord {
467 RuntimeDoneRecord {
468 request_id: request_id.to_string(),
469 status: "success".to_string(),
470 metrics: None,
471 tracing: None,
472 spans: vec![],
473 }
474 }
475
476 fn make_report_record(request_id: &str) -> ReportRecord {
477 ReportRecord {
478 request_id: request_id.to_string(),
479 status: "success".to_string(),
480 metrics: ReportMetrics {
481 duration_ms: 100.5,
482 billed_duration_ms: 200,
483 memory_size_mb: 128,
484 max_memory_used_mb: 64,
485 init_duration_ms: None,
486 restore_duration_ms: None,
487 },
488 tracing: None,
489 }
490 }
491
492 #[test]
493 fn test_convert_report_to_metrics() {
494 let converter = MetricsConverter::with_defaults();
495 let record = make_report_record("test-request-id");
496 let time = "2022-10-12T00:00:00.000Z";
497
498 let request = converter.convert_report(&record, time);
499
500 assert_eq!(request.resource_metrics.len(), 1);
501 let scope_metrics = &request.resource_metrics[0].scope_metrics;
502 assert_eq!(scope_metrics.len(), 1);
503
504 let metrics = &scope_metrics[0].metrics;
505 assert_eq!(metrics.len(), 3); let names: Vec<_> = metrics.iter().map(|m| m.name.as_str()).collect();
509 assert!(names.contains(&"faas.invocation.duration"));
510 assert!(names.contains(&"aws.lambda.billed_duration"));
511 assert!(names.contains(&"aws.lambda.max_memory_used"));
512 }
513
514 #[test]
515 fn test_convert_report_with_init_duration() {
516 let converter = MetricsConverter::with_defaults();
517 let mut record = make_report_record("test-request-id");
518 record.metrics.init_duration_ms = Some(500.0);
519
520 let request = converter.convert_report(&record, "2022-10-12T00:00:00.000Z");
521
522 let metrics = &request.resource_metrics[0].scope_metrics[0].metrics;
523 assert_eq!(metrics.len(), 4); let names: Vec<_> = metrics.iter().map(|m| m.name.as_str()).collect();
526 assert!(names.contains(&"aws.lambda.init_duration"));
527 }
528
529 #[test]
530 fn test_create_invocation_span() {
531 let converter = SpanConverter::with_defaults();
532 let start = make_start_record("test-request-id");
533 let done = make_runtime_done_record("test-request-id");
534
535 let request = converter.create_invocation_span(
536 &start,
537 "2022-10-12T00:00:00.000Z",
538 &done,
539 "2022-10-12T00:00:01.000Z",
540 );
541
542 assert_eq!(request.resource_spans.len(), 1);
543 let spans = &request.resource_spans[0].scope_spans[0].spans;
544 assert_eq!(spans.len(), 1);
545
546 let span = &spans[0];
547 assert_eq!(span.name, "lambda.invoke");
548 assert!(span.end_time_unix_nano > span.start_time_unix_nano);
549
550 assert_eq!(span.trace_id.len(), 16);
552 assert_ne!(span.trace_id, vec![0u8; 16]);
553
554 assert_eq!(span.span_id.len(), 8);
556 }
557
558 #[test]
559 fn test_create_invocation_span_with_xray() {
560 let converter = SpanConverter::with_defaults();
561 let start = StartRecord {
562 request_id: "test-request-id".to_string(),
563 version: Some("$LATEST".to_string()),
564 tracing: Some(TracingRecord {
565 trace_type: Some("X-Amzn-Trace-Id".to_string()),
566 value: Some(
567 "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1"
568 .to_string(),
569 ),
570 span_id: None,
571 }),
572 };
573 let done = make_runtime_done_record("test-request-id");
574
575 let request = converter.create_invocation_span(
576 &start,
577 "2022-10-12T00:00:00.000Z",
578 &done,
579 "2022-10-12T00:00:01.000Z",
580 );
581
582 let span = &request.resource_spans[0].scope_spans[0].spans[0];
583
584 assert_eq!(span.trace_id.len(), 16);
586 assert_ne!(span.trace_id, vec![0u8; 16]); assert_eq!(span.parent_span_id.len(), 8);
590 }
591
592 #[test]
593 fn test_processor_collects_events() {
594 let mut processor = TelemetryProcessor::with_defaults();
595
596 let events = vec![
597 TelemetryEvent::Start {
598 time: "2022-10-12T00:00:00.000Z".to_string(),
599 record: make_start_record("request-1"),
600 },
601 TelemetryEvent::RuntimeDone {
602 time: "2022-10-12T00:00:01.000Z".to_string(),
603 record: make_runtime_done_record("request-1"),
604 },
605 TelemetryEvent::Report {
606 time: "2022-10-12T00:00:01.100Z".to_string(),
607 record: make_report_record("request-1"),
608 },
609 ];
610
611 let (metrics, traces) = processor.process_events(events);
612
613 assert_eq!(metrics.len(), 1);
614 assert_eq!(traces.len(), 1);
615 assert_eq!(processor.pending_count(), 0);
616 }
617
618 #[test]
619 fn test_processor_handles_out_of_order() {
620 let mut processor = TelemetryProcessor::with_defaults();
621
622 let events1 = vec![TelemetryEvent::Start {
624 time: "2022-10-12T00:00:00.000Z".to_string(),
625 record: make_start_record("request-1"),
626 }];
627
628 let (metrics, traces) = processor.process_events(events1);
629 assert_eq!(metrics.len(), 0);
630 assert_eq!(traces.len(), 0);
631 assert_eq!(processor.pending_count(), 1);
632
633 let events2 = vec![TelemetryEvent::RuntimeDone {
635 time: "2022-10-12T00:00:01.000Z".to_string(),
636 record: make_runtime_done_record("request-1"),
637 }];
638
639 let (metrics, traces) = processor.process_events(events2);
640 assert_eq!(metrics.len(), 0);
641 assert_eq!(traces.len(), 1);
642 assert_eq!(processor.pending_count(), 0);
643 }
644
645 #[test]
646 fn test_parse_iso8601() {
647 let ts = parse_iso8601_to_nanos("2022-10-12T00:00:00.000Z");
648 assert!(ts.is_some());
649
650 let invalid = parse_iso8601_to_nanos("invalid");
651 assert!(invalid.is_none());
652 }
653
654 #[test]
655 fn test_kv_string() {
656 let kv = kv_string("key", "value");
657 assert_eq!(kv.key, "key");
658
659 match kv.value.unwrap().value.unwrap() {
660 any_value::Value::StringValue(s) => assert_eq!(s, "value"),
661 _ => panic!("Expected string value"),
662 }
663 }
664
665 #[test]
666 fn test_create_shutdown_metric() {
667 use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
668
669 let converter = MetricsConverter::with_defaults();
670 let request = converter.create_shutdown_metric("spindown");
671
672 assert_eq!(request.resource_metrics.len(), 1);
673 let scope_metrics = &request.resource_metrics[0].scope_metrics;
674 assert_eq!(scope_metrics.len(), 1);
675
676 let metrics = &scope_metrics[0].metrics;
677 assert_eq!(metrics.len(), 1);
678
679 let metric = &metrics[0];
680 assert_eq!(metric.name, "extension.shutdown_count");
681 assert_eq!(metric.unit, "{count}");
682
683 if let Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge)) =
685 &metric.data
686 {
687 assert_eq!(gauge.data_points.len(), 1);
688 let data_point = &gauge.data_points[0];
689
690 match data_point.value {
692 Some(Value::AsInt(val)) => assert_eq!(val, 1),
693 _ => panic!("Expected integer value of 1"),
694 }
695
696 let attrs: std::collections::HashMap<_, _> = data_point
698 .attributes
699 .iter()
700 .map(|kv| (kv.key.as_str(), kv.value.as_ref()))
701 .collect();
702 assert!(attrs.contains_key("shutdown.reason"));
703 } else {
704 panic!("Expected Gauge metric");
705 }
706 }
707
708 #[test]
709 fn test_shutdown_metric_different_reasons() {
710 let converter = MetricsConverter::with_defaults();
711
712 for reason in &["spindown", "timeout", "failure"] {
713 let request = converter.create_shutdown_metric(reason);
714 let metric = &request.resource_metrics[0].scope_metrics[0].metrics[0];
715
716 if let Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge)) =
717 &metric.data
718 {
719 let attr = &gauge.data_points[0].attributes[0];
720 assert_eq!(attr.key, "shutdown.reason");
721
722 if let Some(any_value::Value::StringValue(val)) =
723 attr.value.as_ref().and_then(|v| v.value.as_ref())
724 {
725 assert_eq!(val, *reason);
726 }
727 }
728 }
729 }
730}