use std::{fmt::Display, future::Future, pin::Pin, task};
use crate::LambdaInvocation;
use opentelemetry_semantic_conventions::attribute;
use pin_project::pin_project;
use tower::{Layer, Service};
use tracing::{field, instrument::Instrumented, Instrument};
pub struct OpenTelemetryLayer<F> {
flush_fn: F,
otel_attribute_trigger: OpenTelemetryFaasTrigger,
}
impl<F> OpenTelemetryLayer<F>
where
F: Fn() + Clone,
{
pub fn new(flush_fn: F) -> Self {
Self {
flush_fn,
otel_attribute_trigger: Default::default(),
}
}
pub fn with_trigger(self, trigger: OpenTelemetryFaasTrigger) -> Self {
Self {
otel_attribute_trigger: trigger,
..self
}
}
}
impl<S, F> Layer<S> for OpenTelemetryLayer<F>
where
F: Fn() + Clone,
{
type Service = OpenTelemetryService<S, F>;
fn layer(&self, inner: S) -> Self::Service {
OpenTelemetryService {
inner,
flush_fn: self.flush_fn.clone(),
coldstart: true,
otel_attribute_trigger: self.otel_attribute_trigger.to_string(),
}
}
}
pub struct OpenTelemetryService<S, F> {
inner: S,
flush_fn: F,
coldstart: bool,
otel_attribute_trigger: String,
}
impl<S, F> Service<LambdaInvocation> for OpenTelemetryService<S, F>
where
S: Service<LambdaInvocation, Response = ()>,
F: Fn() + Clone,
{
type Error = S::Error;
type Response = ();
type Future = OpenTelemetryFuture<Instrumented<S::Future>, F>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: LambdaInvocation) -> Self::Future {
let span = if let Some(tenant_id) = &req.context.tenant_id {
tracing::info_span!(
"Lambda function invocation",
"otel.name" = req.context.env_config.function_name,
"otel.kind" = field::Empty,
{ attribute::FAAS_TRIGGER } = &self.otel_attribute_trigger,
{ attribute::FAAS_INVOCATION_ID } = req.context.request_id,
{ attribute::FAAS_COLDSTART } = self.coldstart,
"tenant_id" = tenant_id
)
} else {
tracing::info_span!(
"Lambda function invocation",
"otel.name" = req.context.env_config.function_name,
"otel.kind" = field::Empty,
{ attribute::FAAS_TRIGGER } = &self.otel_attribute_trigger,
{ attribute::FAAS_INVOCATION_ID } = req.context.request_id,
{ attribute::FAAS_COLDSTART } = self.coldstart
)
};
self.coldstart = false;
let future = {
let _guard = span.enter();
self.inner.call(req)
};
OpenTelemetryFuture {
future: Some(future.instrument(span)),
flush_fn: self.flush_fn.clone(),
}
}
}
#[pin_project]
pub struct OpenTelemetryFuture<Fut, F> {
#[pin]
future: Option<Fut>,
flush_fn: F,
}
impl<Fut, F> Future for OpenTelemetryFuture<Fut, F>
where
Fut: Future,
F: Fn(),
{
type Output = Fut::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let ready = task::ready!(self
.as_mut()
.project()
.future
.as_pin_mut()
.expect("future polled after completion")
.poll(cx));
Pin::set(&mut self.as_mut().project().future, None);
(self.project().flush_fn)();
task::Poll::Ready(ready)
}
}
#[derive(Default, Clone, Copy)]
#[non_exhaustive]
pub enum OpenTelemetryFaasTrigger {
#[default]
Datasource,
Http,
PubSub,
Timer,
Other,
}
impl Display for OpenTelemetryFaasTrigger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OpenTelemetryFaasTrigger::Datasource => write!(f, "datasource"),
OpenTelemetryFaasTrigger::Http => write!(f, "http"),
OpenTelemetryFaasTrigger::PubSub => write!(f, "pubsub"),
OpenTelemetryFaasTrigger::Timer => write!(f, "timer"),
OpenTelemetryFaasTrigger::Other => write!(f, "other"),
}
}
}