opentelemetry_spanprocessor_any/trace/
context.rs

1//! Context extensions for tracing
2use crate::{
3    global,
4    trace::{Span, SpanContext},
5    Context, ContextGuard, KeyValue,
6};
7use futures_util::{sink::Sink, stream::Stream};
8use pin_project::pin_project;
9use std::error::Error;
10use std::sync::Mutex;
11use std::{
12    borrow::Cow,
13    pin::Pin,
14    task::{Context as TaskContext, Poll},
15};
16
17lazy_static::lazy_static! {
18    static ref NOOP_SPAN: SynchronizedSpan = SynchronizedSpan {
19        span_context: SpanContext::empty_context(),
20        inner: None,
21    };
22}
23
24/// A reference to the currently active span in this context.
25#[derive(Debug)]
26pub struct SpanRef<'a>(&'a SynchronizedSpan);
27
28#[derive(Debug)]
29struct SynchronizedSpan {
30    /// Immutable span context
31    span_context: SpanContext,
32    /// Mutable span inner that requires synchronization
33    inner: Option<Mutex<global::BoxedSpan>>,
34}
35
36impl SpanRef<'_> {
37    fn with_inner_mut<F: FnOnce(&mut global::BoxedSpan)>(&self, f: F) {
38        if let Some(ref inner) = self.0.inner {
39            match inner.lock() {
40                Ok(mut locked) => f(&mut *locked),
41                Err(err) => global::handle_error(err),
42            }
43        }
44    }
45}
46
47impl SpanRef<'_> {
48    /// An API to record events in the context of a given `Span`.
49    pub fn add_event<T>(&self, name: T, attributes: Vec<KeyValue>)
50    where
51        T: Into<Cow<'static, str>>,
52    {
53        self.with_inner_mut(|inner| inner.add_event(name, attributes))
54    }
55
56    /// Convenience method to record an exception/error as an `Event`
57    pub fn record_exception(&self, err: &dyn Error) {
58        self.with_inner_mut(|inner| inner.record_exception(err))
59    }
60
61    /// Convenience method to record a exception/error as an `Event` with custom stacktrace
62    pub fn record_exception_with_stacktrace<T>(&self, err: &dyn Error, stacktrace: T)
63    where
64        T: Into<Cow<'static, str>>,
65    {
66        self.with_inner_mut(|inner| inner.record_exception_with_stacktrace(err, stacktrace))
67    }
68
69    /// An API to record events at a specific time in the context of a given `Span`.
70    pub fn add_event_with_timestamp<T>(
71        &self,
72        name: T,
73        timestamp: std::time::SystemTime,
74        attributes: Vec<crate::KeyValue>,
75    ) where
76        T: Into<Cow<'static, str>>,
77    {
78        self.with_inner_mut(move |inner| {
79            inner.add_event_with_timestamp(name, timestamp, attributes)
80        })
81    }
82
83    /// Returns the `SpanContext` for the given `Span`.
84    pub fn span_context(&self) -> &SpanContext {
85        &self.0.span_context
86    }
87
88    /// Returns true if this `Span` is recording information like events with the `add_event`
89    /// operation, attributes using `set_attributes`, status with `set_status`, etc.
90    pub fn is_recording(&self) -> bool {
91        self.0
92            .inner
93            .as_ref()
94            .and_then(|inner| inner.lock().ok().map(|active| active.is_recording()))
95            .unwrap_or(false)
96    }
97
98    /// An API to set a single `Attribute` where the attribute properties are passed
99    /// as arguments. To avoid extra allocations some implementations may offer a separate API for
100    /// each of the possible value types.
101    pub fn set_attribute(&self, attribute: crate::KeyValue) {
102        self.with_inner_mut(move |inner| inner.set_attribute(attribute))
103    }
104
105    /// Sets the status of the `Span`. If used, this will override the default `Span`
106    /// status, which is `Unset`. `message` MUST be ignored when the status is `OK` or `Unset`
107    pub fn set_status(&self, code: super::StatusCode, message: String) {
108        self.with_inner_mut(move |inner| inner.set_status(code, message))
109    }
110
111    /// Updates the `Span`'s name. After this update, any sampling behavior based on the
112    /// name will depend on the implementation.
113    pub fn update_name<T>(&self, new_name: String)
114    where
115        T: Into<Cow<'static, str>>,
116    {
117        self.with_inner_mut(move |inner| inner.update_name(new_name))
118    }
119
120    /// Finishes the `Span`.
121    pub fn end(&self) {
122        self.end_with_timestamp(crate::time::now());
123    }
124
125    /// Finishes the `Span` with given timestamp
126    pub fn end_with_timestamp(&self, timestamp: std::time::SystemTime) {
127        self.with_inner_mut(move |inner| inner.end_with_timestamp(timestamp))
128    }
129}
130
131/// Methods for storing and retrieving trace data in a context.
132pub trait TraceContextExt {
133    /// Returns a clone of the current context with the included span.
134    ///
135    /// This is useful for building tracers.
136    fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self;
137
138    /// Returns a clone of this context with the included span.
139    ///
140    /// This is useful for building tracers.
141    fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self;
142
143    /// Returns a reference to this context's span, or the default no-op span if
144    /// none has been set.
145    ///
146    /// # Examples
147    ///
148    /// ```
149    /// use opentelemetry::{
150    ///     sdk,
151    ///     trace::{SpanContext, TraceContextExt, Tracer, TracerProvider},
152    ///     Context,
153    /// };
154    ///
155    /// // returns a reference to an empty span by default
156    /// assert_eq!(Context::current().span().span_context(), &SpanContext::empty_context());
157    ///
158    /// let provider = sdk::trace::TracerProvider::default();
159    /// provider.tracer("my-component").in_span("my-span", |cx| {
160    ///     // Returns a reference to the current span if set
161    ///     assert_ne!(cx.span().span_context(), &SpanContext::empty_context());
162    /// });
163    /// ```
164    fn span(&self) -> SpanRef<'_>;
165
166    /// Used to see if a span has been marked as active
167    ///
168    /// This is useful for building tracers.
169    fn has_active_span(&self) -> bool;
170
171    /// Returns a copy of this context with the span context included.
172    ///
173    /// This is useful for building propagators.
174    fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self;
175}
176
177impl TraceContextExt for Context {
178    fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self {
179        Context::current_with_value(SynchronizedSpan {
180            span_context: span.span_context().clone(),
181            inner: Some(Mutex::new(global::BoxedSpan::new(span))),
182        })
183    }
184
185    fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self {
186        self.with_value(SynchronizedSpan {
187            span_context: span.span_context().clone(),
188            inner: Some(Mutex::new(global::BoxedSpan::new(span))),
189        })
190    }
191
192    fn span(&self) -> SpanRef<'_> {
193        if let Some(span) = self.get::<SynchronizedSpan>() {
194            SpanRef(span)
195        } else {
196            SpanRef(&*NOOP_SPAN)
197        }
198    }
199
200    fn has_active_span(&self) -> bool {
201        self.get::<SynchronizedSpan>().is_some()
202    }
203
204    fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self {
205        self.with_value(SynchronizedSpan {
206            span_context,
207            inner: None,
208        })
209    }
210}
211
212/// Mark a given `Span` as active.
213///
214/// The `Tracer` MUST provide a way to update its active `Span`, and MAY provide convenience
215/// methods to manage a `Span`'s lifetime and the scope in which a `Span` is active. When an
216/// active `Span` is made inactive, the previously-active `Span` SHOULD be made active. A `Span`
217/// maybe finished (i.e. have a non-null end time) but still be active. A `Span` may be active
218/// on one thread after it has been made inactive on another.
219///
220/// # Examples
221///
222/// ```
223/// use opentelemetry::{global, trace::{Span, Tracer}, KeyValue};
224/// use opentelemetry::trace::{get_active_span, mark_span_as_active};
225///
226/// fn my_function() {
227///     let tracer = global::tracer("my-component-a");
228///     // start an active span in one function
229///     let span = tracer.start("span-name");
230///     let _guard = mark_span_as_active(span);
231///     // anything happening in functions we call can still access the active span...
232///     my_other_function();
233/// }
234///
235/// fn my_other_function() {
236///     // call methods on the current span from
237///     get_active_span(|span| {
238///         span.add_event("An event!".to_string(), vec![KeyValue::new("happened", true)]);
239///     });
240/// }
241/// ```
242#[must_use = "Dropping the guard detaches the context."]
243pub fn mark_span_as_active<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> ContextGuard {
244    let cx = Context::current_with_span(span);
245    cx.attach()
246}
247
248/// Executes a closure with a reference to this thread's current span.
249///
250/// # Examples
251///
252/// ```
253/// use opentelemetry::{global, trace::{Span, Tracer}, KeyValue};
254/// use opentelemetry::trace::get_active_span;
255///
256/// fn my_function() {
257///     // start an active span in one function
258///     global::tracer("my-component").in_span("span-name", |_cx| {
259///         // anything happening in functions we call can still access the active span...
260///         my_other_function();
261///     })
262/// }
263///
264/// fn my_other_function() {
265///     // call methods on the current span from
266///     get_active_span(|span| {
267///         span.add_event("An event!".to_string(), vec![KeyValue::new("happened", true)]);
268///     })
269/// }
270/// ```
271pub fn get_active_span<F, T>(f: F) -> T
272where
273    F: FnOnce(SpanRef<'_>) -> T,
274{
275    f(Context::current().span())
276}
277
278/// A future, stream, or sink that has an associated context.
279#[pin_project]
280#[derive(Clone, Debug)]
281pub struct WithContext<T> {
282    #[pin]
283    inner: T,
284    otel_cx: Context,
285}
286
287impl<T: Sized> FutureExt for T {}
288
289impl<T: std::future::Future> std::future::Future for WithContext<T> {
290    type Output = T::Output;
291
292    fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
293        let this = self.project();
294        let _guard = this.otel_cx.clone().attach();
295
296        this.inner.poll(task_cx)
297    }
298}
299
300impl<T: Stream> Stream for WithContext<T> {
301    type Item = T::Item;
302
303    fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
304        let this = self.project();
305        let _guard = this.otel_cx.clone().attach();
306        T::poll_next(this.inner, task_cx)
307    }
308}
309
310impl<I, T: Sink<I>> Sink<I> for WithContext<T>
311where
312    T: Sink<I>,
313{
314    type Error = T::Error;
315
316    fn poll_ready(
317        self: Pin<&mut Self>,
318        task_cx: &mut TaskContext<'_>,
319    ) -> Poll<Result<(), Self::Error>> {
320        let this = self.project();
321        let _guard = this.otel_cx.clone().attach();
322        T::poll_ready(this.inner, task_cx)
323    }
324
325    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
326        let this = self.project();
327        let _guard = this.otel_cx.clone().attach();
328        T::start_send(this.inner, item)
329    }
330
331    fn poll_flush(
332        self: Pin<&mut Self>,
333        task_cx: &mut TaskContext<'_>,
334    ) -> Poll<Result<(), Self::Error>> {
335        let this = self.project();
336        let _guard = this.otel_cx.clone().attach();
337        T::poll_flush(this.inner, task_cx)
338    }
339
340    fn poll_close(
341        self: Pin<&mut Self>,
342        task_cx: &mut TaskContext<'_>,
343    ) -> Poll<Result<(), Self::Error>> {
344        let this = self.project();
345        let _enter = this.otel_cx.clone().attach();
346        T::poll_close(this.inner, task_cx)
347    }
348}
349
350/// Extension trait allowing futures, streams, and sinks to be traced with a span.
351pub trait FutureExt: Sized {
352    /// Attaches the provided [`Context`] to this type, returning a `WithContext`
353    /// wrapper.
354    ///
355    /// When the wrapped type is a future, stream, or sink, the attached context
356    /// will be set as current while it is being polled.
357    ///
358    /// [`Context`]: crate::Context
359    fn with_context(self, otel_cx: Context) -> WithContext<Self> {
360        WithContext {
361            inner: self,
362            otel_cx,
363        }
364    }
365
366    /// Attaches the current [`Context`] to this type, returning a `WithContext`
367    /// wrapper.
368    ///
369    /// When the wrapped type is a future, stream, or sink, the attached context
370    /// will be set as the default while it is being polled.
371    ///
372    /// [`Context`]: crate::Context
373    fn with_current_context(self) -> WithContext<Self> {
374        let otel_cx = Context::current();
375        self.with_context(otel_cx)
376    }
377}