lambda_otel_utils/vended/
lambda_runtime_otel.rs1use std::{fmt::Display, future::Future, pin::Pin, task};
15
16use lambda_runtime::tower::{Layer, Service};
17use lambda_runtime::LambdaInvocation;
18use opentelemetry_semantic_conventions::trace as traceconv;
19use pin_project::pin_project;
20use tracing::{field, instrument::Instrumented, Instrument};
21pub struct OpenTelemetryLayer<F> {
24 flush_fn: F,
25 otel_attribute_trigger: OpenTelemetryFaasTrigger,
26}
27
28impl<F> OpenTelemetryLayer<F>
29where
30 F: Fn() + Clone,
31{
32 pub fn new(flush_fn: F) -> Self {
34 Self {
35 flush_fn,
36 otel_attribute_trigger: Default::default(),
37 }
38 }
39
40 pub fn with_trigger(self, trigger: OpenTelemetryFaasTrigger) -> Self {
42 Self {
43 otel_attribute_trigger: trigger,
44 ..self
45 }
46 }
47}
48
49impl<S, F> Layer<S> for OpenTelemetryLayer<F>
50where
51 F: Fn() + Clone,
52{
53 type Service = OpenTelemetryService<S, F>;
54
55 fn layer(&self, inner: S) -> Self::Service {
56 OpenTelemetryService {
57 inner,
58 flush_fn: self.flush_fn.clone(),
59 coldstart: true,
60 otel_attribute_trigger: self.otel_attribute_trigger.to_string(),
61 }
62 }
63}
64
65pub struct OpenTelemetryService<S, F> {
67 inner: S,
68 flush_fn: F,
69 coldstart: bool,
70 otel_attribute_trigger: String,
71}
72
73impl<S, F> Service<LambdaInvocation> for OpenTelemetryService<S, F>
74where
75 S: Service<LambdaInvocation, Response = ()>,
76 F: Fn() + Clone,
77{
78 type Error = S::Error;
79 type Response = ();
80 type Future = OpenTelemetryFuture<Instrumented<S::Future>, F>;
81
82 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
83 self.inner.poll_ready(cx)
84 }
85
86 fn call(&mut self, req: LambdaInvocation) -> Self::Future {
87 let span = tracing::info_span!(
88 "Lambda function invocation",
89 "otel.name" = req.context.env_config.function_name,
90 "otel.kind" = field::Empty,
91 { traceconv::FAAS_TRIGGER } = &self.otel_attribute_trigger,
92 { traceconv::FAAS_INVOCATION_ID } = req.context.request_id,
93 { traceconv::FAAS_COLDSTART } = self.coldstart
94 );
95
96 self.coldstart = false;
98
99 let future = {
100 let _guard = span.enter();
103 self.inner.call(req)
104 };
105 OpenTelemetryFuture {
106 future: Some(future.instrument(span)),
107 flush_fn: self.flush_fn.clone(),
108 }
109 }
110}
111
112#[pin_project]
114pub struct OpenTelemetryFuture<Fut, F> {
115 #[pin]
116 future: Option<Fut>,
117 flush_fn: F,
118}
119
120impl<Fut, F> Future for OpenTelemetryFuture<Fut, F>
121where
122 Fut: Future,
123 F: Fn(),
124{
125 type Output = Fut::Output;
126
127 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
128 let ready = task::ready!(self
130 .as_mut()
131 .project()
132 .future
133 .as_pin_mut()
134 .expect("future polled after completion")
135 .poll(cx));
136
137 Pin::set(&mut self.as_mut().project().future, None);
140 (self.project().flush_fn)();
141 task::Poll::Ready(ready)
142 }
143}
144
145#[derive(Default, Clone, Copy)]
148#[non_exhaustive]
149pub enum OpenTelemetryFaasTrigger {
150 #[default]
152 Datasource,
153 Http,
155 PubSub,
157 Timer,
159 Other,
161}
162
163impl Display for OpenTelemetryFaasTrigger {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 match self {
166 OpenTelemetryFaasTrigger::Datasource => write!(f, "datasource"),
167 OpenTelemetryFaasTrigger::Http => write!(f, "http"),
168 OpenTelemetryFaasTrigger::PubSub => write!(f, "pubsub"),
169 OpenTelemetryFaasTrigger::Timer => write!(f, "timer"),
170 OpenTelemetryFaasTrigger::Other => write!(f, "other"),
171 }
172 }
173}