use std::{env, mem::ManuallyDrop, pin::Pin, task};
use opentelemetry::{
Context, KeyValue,
trace::{SpanBuilder, SpanContext, SpanKind, TraceContextExt, TraceState, Tracer},
};
use opentelemetry_semantic_conventions::attribute as semco;
use pin_project::{pin_project, pinned_drop};
use tokio::task::futures::TaskLocalFuture;
use super::{InstrumentedFuture, Instrumentor, utils::XRayTraceHeader};
#[derive(Debug, Clone)]
pub struct OtelInstrumentor;
#[pin_project(PinnedDrop)]
pub struct OtelInstrumentedFuture<Fut: Future> {
#[pin]
future: ManuallyDrop<Fut>,
ctx: Context,
}
impl<Fut: Future> Future for OtelInstrumentedFuture<Fut> {
type Output = Fut::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let this = self.project();
let future = unsafe { this.future.map_unchecked_mut(|v| &mut **v) };
let _guard = this.ctx.clone().attach();
future.poll(cx)
}
}
#[pinned_drop]
impl<Fut: Future> PinnedDrop for OtelInstrumentedFuture<Fut> {
fn drop(self: std::pin::Pin<&mut Self>) {
let this = self.project();
let _guard = this.ctx.clone().attach();
unsafe { ManuallyDrop::drop(this.future.get_unchecked_mut()) }
}
}
impl<Fut: Future> InstrumentedFuture for OtelInstrumentedFuture<Fut> {
type Fut = Self;
}
tokio::task_local! {
static INVOCATION_CTX: Context;
}
impl Instrumentor for OtelInstrumentor {
type IFut<F: Future> = OtelInstrumentedFuture<TaskLocalFuture<Context, F>>;
type InvocationSpan = Context;
fn instrument<F: Future>(inner: F, context: super::InvocationContext) -> Self::IFut<F> {
let tracer = opentelemetry::global::tracer(env!("CARGO_PKG_NAME"));
let span_builder = SpanBuilder::from_name(
env::var("AWS_LAMBDA_FUNCTION_NAME")
.ok()
.unwrap_or_default(),
)
.with_kind(SpanKind::Server)
.with_attributes([
KeyValue::new(semco::FAAS_TRIGGER, context.trigger.to_string()),
KeyValue::new(semco::CLOUD_RESOURCE_ID, context.function_arn),
KeyValue::new(semco::FAAS_INVOCATION_ID, context.request_id),
KeyValue::new(semco::CLOUD_ACCOUNT_ID, context.account_id),
KeyValue::new(semco::FAAS_COLDSTART, context.is_coldstart),
]);
let ctx = if let Some(XRayTraceHeader {
trace_id,
parent_id,
sampled,
}) = context.xray_trace_header
{
let otel_context = opentelemetry::Context::new().with_remote_span_context(
SpanContext::new(trace_id, parent_id, sampled, true, TraceState::NONE),
);
let span = tracer.build_with_context(
span_builder
.with_trace_id(trace_id)
.with_attributes([KeyValue::new("xray_trace_id", trace_id.to_string())]),
&otel_context,
);
otel_context.with_span(span)
} else {
let span = tracer.build_with_context(span_builder, &opentelemetry::Context::new());
opentelemetry::Context::new().with_span(span)
};
let inner = INVOCATION_CTX.scope(ctx.clone(), inner);
OtelInstrumentedFuture {
future: ManuallyDrop::new(inner),
ctx,
}
}
fn with_invocation_span(f: impl FnOnce(&mut Self::InvocationSpan)) {
INVOCATION_CTX.with(|ctx| {
let mut ctx = ctx.clone();
f(&mut ctx);
});
}
fn spawn<F>(future: F) -> tokio::task::JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let invocation_ctx = INVOCATION_CTX.with(|ctx| ctx.clone());
tokio::spawn(INVOCATION_CTX.scope(invocation_ctx, future))
}
}