1use crate::resource::semconv;
7use crate::telemetry::{ReportRecord, RuntimeDoneRecord, StartRecord, TelemetryEvent};
8use crate::tracing::XRayTraceHeader;
9use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
10use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
11use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value};
12use opentelemetry_proto::tonic::metrics::v1::{
13 Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, metric::Data,
14};
15use opentelemetry_proto::tonic::resource::v1::Resource;
16use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status, status};
17use opentelemetry_semantic_conventions::SCHEMA_URL;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20const SCOPE_NAME: &str = "lambda-otel-extension";
22const SCOPE_VERSION: &str = env!("CARGO_PKG_VERSION");
24
25pub struct MetricsConverter {
27 resource: Resource,
28}
29
30impl MetricsConverter {
31 pub fn new(resource: Resource) -> Self {
33 Self { resource }
34 }
35
36 pub fn with_defaults() -> Self {
38 Self::new(Resource::default())
39 }
40
41 pub fn set_resource(&mut self, resource: Resource) {
43 self.resource = resource.clone();
44 }
45
46 pub fn convert_report(&self, record: &ReportRecord, time: &str) -> ExportMetricsServiceRequest {
54 let timestamp_nanos = parse_iso8601_to_nanos(time).unwrap_or_else(current_time_nanos);
55
56 let mut metrics = vec![
57 self.create_gauge_metric(
58 "faas.invocation.duration",
59 "Duration of the function invocation",
60 "ms",
61 record.metrics.duration_ms,
62 timestamp_nanos,
63 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
64 ),
65 self.create_gauge_metric(
66 "aws.lambda.billed_duration",
67 "Billed duration of the invocation",
68 "ms",
69 record.metrics.billed_duration_ms as f64,
70 timestamp_nanos,
71 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
72 ),
73 self.create_gauge_metric(
74 "aws.lambda.max_memory_used",
75 "Maximum memory used during invocation",
76 "By",
77 (record.metrics.max_memory_used_mb * 1024 * 1024) as f64,
78 timestamp_nanos,
79 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
80 ),
81 ];
82
83 if let Some(init_duration) = record.metrics.init_duration_ms {
85 metrics.push(self.create_gauge_metric(
86 "aws.lambda.init_duration",
87 "Cold start initialization duration",
88 "ms",
89 init_duration,
90 timestamp_nanos,
91 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
92 ));
93 }
94
95 if let Some(restore_duration) = record.metrics.restore_duration_ms {
97 metrics.push(self.create_gauge_metric(
98 "aws.lambda.restore_duration",
99 "SnapStart restore duration",
100 "ms",
101 restore_duration,
102 timestamp_nanos,
103 vec![kv_string(semconv::FAAS_INVOCATION_ID, &record.request_id)],
104 ));
105 }
106
107 ExportMetricsServiceRequest {
108 resource_metrics: vec![ResourceMetrics {
109 resource: Some(self.resource.clone()),
110 scope_metrics: vec![ScopeMetrics {
111 scope: Some(
112 opentelemetry_proto::tonic::common::v1::InstrumentationScope {
113 name: SCOPE_NAME.to_string(),
114 version: SCOPE_VERSION.to_string(),
115 ..Default::default()
116 },
117 ),
118 metrics,
119 schema_url: SCHEMA_URL.to_string(),
120 }],
121 schema_url: SCHEMA_URL.to_string(),
122 }],
123 }
124 }
125
126 pub fn create_shutdown_metric(&self, shutdown_reason: &str) -> ExportMetricsServiceRequest {
137 let timestamp_nanos = current_time_nanos();
138
139 let metric = Metric {
140 name: "extension.shutdown_count".to_string(),
141 description: "Count of extension shutdown events".to_string(),
142 unit: "{count}".to_string(),
143 data: Some(Data::Gauge(Gauge {
144 data_points: vec![NumberDataPoint {
145 attributes: vec![kv_string("shutdown.reason", shutdown_reason)],
146 start_time_unix_nano: timestamp_nanos,
147 time_unix_nano: timestamp_nanos,
148 exemplars: vec![],
149 flags: 0,
150 value: Some(
151 opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(1),
152 ),
153 }],
154 })),
155 metadata: vec![],
156 };
157
158 ExportMetricsServiceRequest {
159 resource_metrics: vec![ResourceMetrics {
160 resource: Some(self.resource.clone()),
161 scope_metrics: vec![ScopeMetrics {
162 scope: Some(
163 opentelemetry_proto::tonic::common::v1::InstrumentationScope {
164 name: SCOPE_NAME.to_string(),
165 version: SCOPE_VERSION.to_string(),
166 ..Default::default()
167 },
168 ),
169 metrics: vec![metric],
170 schema_url: SCHEMA_URL.to_string(),
171 }],
172 schema_url: SCHEMA_URL.to_string(),
173 }],
174 }
175 }
176
177 fn create_gauge_metric(
178 &self,
179 name: &str,
180 description: &str,
181 unit: &str,
182 value: f64,
183 timestamp_nanos: u64,
184 attributes: Vec<KeyValue>,
185 ) -> Metric {
186 Metric {
187 name: name.to_string(),
188 description: description.to_string(),
189 unit: unit.to_string(),
190 data: Some(Data::Gauge(Gauge {
191 data_points: vec![NumberDataPoint {
192 attributes,
193 start_time_unix_nano: timestamp_nanos,
194 time_unix_nano: timestamp_nanos,
195 exemplars: vec![],
196 flags: 0,
197 value: Some(
198 opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsDouble(
199 value,
200 ),
201 ),
202 }],
203 })),
204 metadata: vec![],
205 }
206 }
207}
208
209pub struct SpanConverter {
211 resource: Resource,
212}
213
214impl SpanConverter {
215 pub fn new(resource: Resource) -> Self {
217 Self { resource }
218 }
219
220 pub fn with_defaults() -> Self {
222 Self::new(Resource::default())
223 }
224
225 pub fn set_resource(&mut self, resource: Resource) {
227 self.resource = resource.clone();
228 }
229
230 pub fn create_invocation_span(
239 &self,
240 start: &StartRecord,
241 start_time: &str,
242 runtime_done: &RuntimeDoneRecord,
243 done_time: &str,
244 ) -> ExportTraceServiceRequest {
245 let start_nanos = parse_iso8601_to_nanos(start_time).unwrap_or_else(current_time_nanos);
246 let end_nanos = parse_iso8601_to_nanos(done_time).unwrap_or_else(current_time_nanos);
247
248 let (trace_id, parent_span_id) = extract_trace_context(start);
250
251 let span = Span {
252 trace_id: trace_id.unwrap_or_else(generate_trace_id),
253 span_id: generate_span_id(),
254 parent_span_id: parent_span_id.unwrap_or_default(),
255 name: "lambda.invoke".to_string(),
256 kind: opentelemetry_proto::tonic::trace::v1::span::SpanKind::Server as i32,
257 start_time_unix_nano: start_nanos,
258 end_time_unix_nano: end_nanos,
259 attributes: vec![
260 kv_string(semconv::FAAS_INVOCATION_ID, &start.request_id),
261 kv_string("faas.invocation.status", &runtime_done.status),
262 ],
263 dropped_attributes_count: 0,
264 events: vec![],
265 dropped_events_count: 0,
266 links: vec![],
267 dropped_links_count: 0,
268 status: Some(Status {
269 code: if runtime_done.status == "success" {
270 status::StatusCode::Ok as i32
271 } else {
272 status::StatusCode::Error as i32
273 },
274 message: String::new(),
275 }),
276 flags: 0,
277 trace_state: String::new(),
278 };
279
280 ExportTraceServiceRequest {
281 resource_spans: vec![ResourceSpans {
282 resource: Some(self.resource.clone()),
283 scope_spans: vec![ScopeSpans {
284 scope: Some(
285 opentelemetry_proto::tonic::common::v1::InstrumentationScope {
286 name: SCOPE_NAME.to_string(),
287 version: SCOPE_VERSION.to_string(),
288 ..Default::default()
289 },
290 ),
291 spans: vec![span],
292 schema_url: SCHEMA_URL.to_string(),
293 }],
294 schema_url: SCHEMA_URL.to_string(),
295 }],
296 }
297 }
298}
299
300fn extract_trace_context(start: &StartRecord) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
302 let Some(ref tracing) = start.tracing else {
303 return (None, None);
304 };
305
306 let Some(ref value) = tracing.value else {
307 return (None, None);
308 };
309
310 let Some(xray) = XRayTraceHeader::parse(value) else {
311 return (None, None);
312 };
313
314 let Some(w3c) = xray.to_w3c() else {
315 return (None, None);
316 };
317
318 let trace_id = w3c.trace_id_bytes().map(|b| b.to_vec());
319 let span_id = w3c.span_id_bytes().map(|b| b.to_vec());
320
321 (trace_id, span_id)
322}
323
324fn generate_trace_id() -> Vec<u8> {
326 rand::random::<[u8; 16]>().to_vec()
327}
328
329fn generate_span_id() -> Vec<u8> {
331 rand::random::<[u8; 8]>().to_vec()
332}
333
334fn kv_string(key: &str, value: &str) -> KeyValue {
336 KeyValue {
337 key: key.to_string(),
338 value: Some(AnyValue {
339 value: Some(any_value::Value::StringValue(value.to_string())),
340 }),
341 }
342}
343
344fn parse_iso8601_to_nanos(timestamp: &str) -> Option<u64> {
346 let ts = chrono::DateTime::parse_from_rfc3339(timestamp).ok()?;
348 Some(ts.timestamp_nanos_opt()? as u64)
349}
350
351fn current_time_nanos() -> u64 {
353 SystemTime::now()
354 .duration_since(UNIX_EPOCH)
355 .map(|d| d.as_nanos() as u64)
356 .unwrap_or(0)
357}
358
359pub struct TelemetryProcessor {
364 metrics_converter: MetricsConverter,
365 span_converter: SpanConverter,
366 pending_starts: std::collections::HashMap<String, (StartRecord, String)>,
367}
368
369impl TelemetryProcessor {
370 pub fn new(resource: Resource) -> Self {
372 Self {
373 metrics_converter: MetricsConverter::new(resource.clone()),
374 span_converter: SpanConverter::new(resource),
375 pending_starts: std::collections::HashMap::new(),
376 }
377 }
378
379 pub fn with_defaults() -> Self {
381 Self::new(Resource::default())
382 }
383
384 pub fn set_resource(&mut self, resource: Resource) {
386 self.metrics_converter.set_resource(resource.clone());
387 self.span_converter.set_resource(resource);
388 }
389
390 pub fn process_events(
394 &mut self,
395 events: Vec<TelemetryEvent>,
396 ) -> (
397 Vec<ExportMetricsServiceRequest>,
398 Vec<ExportTraceServiceRequest>,
399 ) {
400 let mut metrics = Vec::new();
401 let mut traces = Vec::new();
402
403 for event in events {
404 match event {
405 TelemetryEvent::Start { time, record } => {
406 self.pending_starts
407 .insert(record.request_id.clone(), (record, time));
408 }
409 TelemetryEvent::RuntimeDone { time, record } => {
410 if let Some((start_record, start_time)) =
411 self.pending_starts.remove(&record.request_id)
412 {
413 let trace = self.span_converter.create_invocation_span(
414 &start_record,
415 &start_time,
416 &record,
417 &time,
418 );
419 traces.push(trace);
420 }
421 }
422 TelemetryEvent::Report { time, record } => {
423 let metric = self.metrics_converter.convert_report(&record, &time);
424 metrics.push(metric);
425 }
426 _ => {
427 tracing::trace!(?event, "Received non-invocation telemetry event");
429 }
430 }
431 }
432
433 (metrics, traces)
434 }
435
436 pub fn clear_pending(&mut self) {
440 self.pending_starts.clear();
441 }
442
443 pub fn pending_count(&self) -> usize {
445 self.pending_starts.len()
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::telemetry::{ReportMetrics, TracingRecord};
453
454 fn make_start_record(request_id: &str) -> StartRecord {
455 StartRecord {
456 request_id: request_id.to_string(),
457 version: Some("$LATEST".to_string()),
458 tracing: None,
459 }
460 }
461
462 fn make_runtime_done_record(request_id: &str) -> RuntimeDoneRecord {
463 RuntimeDoneRecord {
464 request_id: request_id.to_string(),
465 status: "success".to_string(),
466 metrics: None,
467 tracing: None,
468 spans: vec![],
469 }
470 }
471
472 fn make_report_record(request_id: &str) -> ReportRecord {
473 ReportRecord {
474 request_id: request_id.to_string(),
475 status: "success".to_string(),
476 metrics: ReportMetrics {
477 duration_ms: 100.5,
478 billed_duration_ms: 200,
479 memory_size_mb: 128,
480 max_memory_used_mb: 64,
481 init_duration_ms: None,
482 restore_duration_ms: None,
483 },
484 tracing: None,
485 }
486 }
487
488 #[test]
489 fn test_convert_report_to_metrics() {
490 let converter = MetricsConverter::with_defaults();
491 let record = make_report_record("test-request-id");
492 let time = "2022-10-12T00:00:00.000Z";
493
494 let request = converter.convert_report(&record, time);
495
496 assert_eq!(request.resource_metrics.len(), 1);
497 let scope_metrics = &request.resource_metrics[0].scope_metrics;
498 assert_eq!(scope_metrics.len(), 1);
499
500 let metrics = &scope_metrics[0].metrics;
501 assert_eq!(metrics.len(), 3); let names: Vec<_> = metrics.iter().map(|m| m.name.as_str()).collect();
505 assert!(names.contains(&"faas.invocation.duration"));
506 assert!(names.contains(&"aws.lambda.billed_duration"));
507 assert!(names.contains(&"aws.lambda.max_memory_used"));
508 }
509
510 #[test]
511 fn test_convert_report_with_init_duration() {
512 let converter = MetricsConverter::with_defaults();
513 let mut record = make_report_record("test-request-id");
514 record.metrics.init_duration_ms = Some(500.0);
515
516 let request = converter.convert_report(&record, "2022-10-12T00:00:00.000Z");
517
518 let metrics = &request.resource_metrics[0].scope_metrics[0].metrics;
519 assert_eq!(metrics.len(), 4); let names: Vec<_> = metrics.iter().map(|m| m.name.as_str()).collect();
522 assert!(names.contains(&"aws.lambda.init_duration"));
523 }
524
525 #[test]
526 fn test_create_invocation_span() {
527 let converter = SpanConverter::with_defaults();
528 let start = make_start_record("test-request-id");
529 let done = make_runtime_done_record("test-request-id");
530
531 let request = converter.create_invocation_span(
532 &start,
533 "2022-10-12T00:00:00.000Z",
534 &done,
535 "2022-10-12T00:00:01.000Z",
536 );
537
538 assert_eq!(request.resource_spans.len(), 1);
539 let spans = &request.resource_spans[0].scope_spans[0].spans;
540 assert_eq!(spans.len(), 1);
541
542 let span = &spans[0];
543 assert_eq!(span.name, "lambda.invoke");
544 assert!(span.end_time_unix_nano > span.start_time_unix_nano);
545
546 assert_eq!(span.trace_id.len(), 16);
548 assert_ne!(span.trace_id, vec![0u8; 16]);
549
550 assert_eq!(span.span_id.len(), 8);
552 }
553
554 #[test]
555 fn test_create_invocation_span_with_xray() {
556 let converter = SpanConverter::with_defaults();
557 let start = StartRecord {
558 request_id: "test-request-id".to_string(),
559 version: Some("$LATEST".to_string()),
560 tracing: Some(TracingRecord {
561 trace_type: Some("X-Amzn-Trace-Id".to_string()),
562 value: Some(
563 "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1"
564 .to_string(),
565 ),
566 span_id: None,
567 }),
568 };
569 let done = make_runtime_done_record("test-request-id");
570
571 let request = converter.create_invocation_span(
572 &start,
573 "2022-10-12T00:00:00.000Z",
574 &done,
575 "2022-10-12T00:00:01.000Z",
576 );
577
578 let span = &request.resource_spans[0].scope_spans[0].spans[0];
579
580 assert_eq!(span.trace_id.len(), 16);
582 assert_ne!(span.trace_id, vec![0u8; 16]); assert_eq!(span.parent_span_id.len(), 8);
586 }
587
588 #[test]
589 fn test_processor_collects_events() {
590 let mut processor = TelemetryProcessor::with_defaults();
591
592 let events = vec![
593 TelemetryEvent::Start {
594 time: "2022-10-12T00:00:00.000Z".to_string(),
595 record: make_start_record("request-1"),
596 },
597 TelemetryEvent::RuntimeDone {
598 time: "2022-10-12T00:00:01.000Z".to_string(),
599 record: make_runtime_done_record("request-1"),
600 },
601 TelemetryEvent::Report {
602 time: "2022-10-12T00:00:01.100Z".to_string(),
603 record: make_report_record("request-1"),
604 },
605 ];
606
607 let (metrics, traces) = processor.process_events(events);
608
609 assert_eq!(metrics.len(), 1);
610 assert_eq!(traces.len(), 1);
611 assert_eq!(processor.pending_count(), 0);
612 }
613
614 #[test]
615 fn test_processor_handles_out_of_order() {
616 let mut processor = TelemetryProcessor::with_defaults();
617
618 let events1 = vec![TelemetryEvent::Start {
620 time: "2022-10-12T00:00:00.000Z".to_string(),
621 record: make_start_record("request-1"),
622 }];
623
624 let (metrics, traces) = processor.process_events(events1);
625 assert_eq!(metrics.len(), 0);
626 assert_eq!(traces.len(), 0);
627 assert_eq!(processor.pending_count(), 1);
628
629 let events2 = vec![TelemetryEvent::RuntimeDone {
631 time: "2022-10-12T00:00:01.000Z".to_string(),
632 record: make_runtime_done_record("request-1"),
633 }];
634
635 let (metrics, traces) = processor.process_events(events2);
636 assert_eq!(metrics.len(), 0);
637 assert_eq!(traces.len(), 1);
638 assert_eq!(processor.pending_count(), 0);
639 }
640
641 #[test]
642 fn test_parse_iso8601() {
643 let ts = parse_iso8601_to_nanos("2022-10-12T00:00:00.000Z");
644 assert!(ts.is_some());
645
646 let invalid = parse_iso8601_to_nanos("invalid");
647 assert!(invalid.is_none());
648 }
649
650 #[test]
651 fn test_kv_string() {
652 let kv = kv_string("key", "value");
653 assert_eq!(kv.key, "key");
654
655 match kv.value.unwrap().value.unwrap() {
656 any_value::Value::StringValue(s) => assert_eq!(s, "value"),
657 _ => panic!("Expected string value"),
658 }
659 }
660
661 #[test]
662 fn test_create_shutdown_metric() {
663 use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
664
665 let converter = MetricsConverter::with_defaults();
666 let request = converter.create_shutdown_metric("spindown");
667
668 assert_eq!(request.resource_metrics.len(), 1);
669 let scope_metrics = &request.resource_metrics[0].scope_metrics;
670 assert_eq!(scope_metrics.len(), 1);
671
672 let metrics = &scope_metrics[0].metrics;
673 assert_eq!(metrics.len(), 1);
674
675 let metric = &metrics[0];
676 assert_eq!(metric.name, "extension.shutdown_count");
677 assert_eq!(metric.unit, "{count}");
678
679 if let Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge)) =
681 &metric.data
682 {
683 assert_eq!(gauge.data_points.len(), 1);
684 let data_point = &gauge.data_points[0];
685
686 match data_point.value {
688 Some(Value::AsInt(val)) => assert_eq!(val, 1),
689 _ => panic!("Expected integer value of 1"),
690 }
691
692 let attrs: std::collections::HashMap<_, _> = data_point
694 .attributes
695 .iter()
696 .map(|kv| (kv.key.as_str(), kv.value.as_ref()))
697 .collect();
698 assert!(attrs.contains_key("shutdown.reason"));
699 } else {
700 panic!("Expected Gauge metric");
701 }
702 }
703
704 #[test]
705 fn test_shutdown_metric_different_reasons() {
706 let converter = MetricsConverter::with_defaults();
707
708 for reason in &["spindown", "timeout", "failure"] {
709 let request = converter.create_shutdown_metric(reason);
710 let metric = &request.resource_metrics[0].scope_metrics[0].metrics[0];
711
712 if let Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge)) =
713 &metric.data
714 {
715 let attr = &gauge.data_points[0].attributes[0];
716 assert_eq!(attr.key, "shutdown.reason");
717
718 if let Some(any_value::Value::StringValue(val)) =
719 attr.value.as_ref().and_then(|v| v.value.as_ref())
720 {
721 assert_eq!(val, *reason);
722 }
723 }
724 }
725 }
726}