use crate::{
global,
trace::{Span, SpanContext, Status},
Context, ContextGuard, KeyValue,
};
use futures_util::{sink::Sink, stream::Stream};
use once_cell::sync::Lazy;
use pin_project_lite::pin_project;
use std::{
borrow::Cow,
error::Error,
pin::Pin,
sync::Mutex,
task::{Context as TaskContext, Poll},
};
static NOOP_SPAN: Lazy<SynchronizedSpan> = Lazy::new(|| SynchronizedSpan {
span_context: SpanContext::empty_context(),
inner: None,
});
#[derive(Debug)]
pub struct SpanRef<'a>(&'a SynchronizedSpan);
#[derive(Debug)]
struct SynchronizedSpan {
span_context: SpanContext,
inner: Option<Mutex<global::BoxedSpan>>,
}
impl SpanRef<'_> {
fn with_inner_mut<F: FnOnce(&mut global::BoxedSpan)>(&self, f: F) {
if let Some(ref inner) = self.0.inner {
match inner.lock() {
Ok(mut locked) => f(&mut locked),
Err(err) => global::handle_error(err),
}
}
}
}
impl SpanRef<'_> {
pub fn add_event<T>(&self, name: T, attributes: Vec<KeyValue>)
where
T: Into<Cow<'static, str>>,
{
self.with_inner_mut(|inner| inner.add_event(name, attributes))
}
pub fn record_error(&self, err: &dyn Error) {
self.with_inner_mut(|inner| inner.record_error(err))
}
pub fn add_event_with_timestamp<T>(
&self,
name: T,
timestamp: std::time::SystemTime,
attributes: Vec<crate::KeyValue>,
) where
T: Into<Cow<'static, str>>,
{
self.with_inner_mut(move |inner| {
inner.add_event_with_timestamp(name, timestamp, attributes)
})
}
pub fn span_context(&self) -> &SpanContext {
&self.0.span_context
}
pub fn is_recording(&self) -> bool {
self.0
.inner
.as_ref()
.and_then(|inner| inner.lock().ok().map(|active| active.is_recording()))
.unwrap_or(false)
}
pub fn set_attribute(&self, attribute: crate::KeyValue) {
self.with_inner_mut(move |inner| inner.set_attribute(attribute))
}
pub fn set_attributes(&self, attributes: impl IntoIterator<Item = KeyValue>) {
self.with_inner_mut(move |inner| inner.set_attributes(attributes))
}
pub fn set_status(&self, status: Status) {
self.with_inner_mut(move |inner| inner.set_status(status))
}
pub fn update_name<T>(&self, new_name: T)
where
T: Into<Cow<'static, str>>,
{
self.with_inner_mut(move |inner| inner.update_name(new_name))
}
pub fn end(&self) {
self.end_with_timestamp(crate::time::now());
}
pub fn end_with_timestamp(&self, timestamp: std::time::SystemTime) {
self.with_inner_mut(move |inner| inner.end_with_timestamp(timestamp))
}
}
pub trait TraceContextExt {
fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self;
fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self;
fn span(&self) -> SpanRef<'_>;
fn has_active_span(&self) -> bool;
fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self;
}
impl TraceContextExt for Context {
fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self {
Context::current_with_value(SynchronizedSpan {
span_context: span.span_context().clone(),
inner: Some(Mutex::new(global::BoxedSpan::new(span))),
})
}
fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self {
self.with_value(SynchronizedSpan {
span_context: span.span_context().clone(),
inner: Some(Mutex::new(global::BoxedSpan::new(span))),
})
}
fn span(&self) -> SpanRef<'_> {
if let Some(span) = self.get::<SynchronizedSpan>() {
SpanRef(span)
} else {
SpanRef(&NOOP_SPAN)
}
}
fn has_active_span(&self) -> bool {
self.get::<SynchronizedSpan>().is_some()
}
fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self {
self.with_value(SynchronizedSpan {
span_context,
inner: None,
})
}
}
#[must_use = "Dropping the guard detaches the context."]
pub fn mark_span_as_active<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> ContextGuard {
let cx = Context::current_with_span(span);
cx.attach()
}
pub fn get_active_span<F, T>(f: F) -> T
where
F: FnOnce(SpanRef<'_>) -> T,
{
Context::map_current(|cx| f(cx.span()))
}
pin_project! {
#[derive(Clone, Debug)]
pub struct WithContext<T> {
#[pin]
inner: T,
otel_cx: Context,
}
}
impl<T: Sized> FutureExt for T {}
impl<T: std::future::Future> std::future::Future for WithContext<T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
this.inner.poll(task_cx)
}
}
impl<T: Stream> Stream for WithContext<T> {
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_next(this.inner, task_cx)
}
}
impl<I, T: Sink<I>> Sink<I> for WithContext<T>
where
T: Sink<I>,
{
type Error = T::Error;
fn poll_ready(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_ready(this.inner, task_cx)
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::start_send(this.inner, item)
}
fn poll_flush(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_flush(this.inner, task_cx)
}
fn poll_close(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _enter = this.otel_cx.clone().attach();
T::poll_close(this.inner, task_cx)
}
}
pub trait FutureExt: Sized {
fn with_context(self, otel_cx: Context) -> WithContext<Self> {
WithContext {
inner: self,
otel_cx,
}
}
fn with_current_context(self) -> WithContext<Self> {
let otel_cx = Context::current();
self.with_context(otel_cx)
}
}