opentelemetry_spanprocessor_any/trace/
context.rs1use crate::{
3 global,
4 trace::{Span, SpanContext},
5 Context, ContextGuard, KeyValue,
6};
7use futures_util::{sink::Sink, stream::Stream};
8use pin_project::pin_project;
9use std::error::Error;
10use std::sync::Mutex;
11use std::{
12 borrow::Cow,
13 pin::Pin,
14 task::{Context as TaskContext, Poll},
15};
16
17lazy_static::lazy_static! {
18 static ref NOOP_SPAN: SynchronizedSpan = SynchronizedSpan {
19 span_context: SpanContext::empty_context(),
20 inner: None,
21 };
22}
23
24#[derive(Debug)]
26pub struct SpanRef<'a>(&'a SynchronizedSpan);
27
28#[derive(Debug)]
29struct SynchronizedSpan {
30 span_context: SpanContext,
32 inner: Option<Mutex<global::BoxedSpan>>,
34}
35
36impl SpanRef<'_> {
37 fn with_inner_mut<F: FnOnce(&mut global::BoxedSpan)>(&self, f: F) {
38 if let Some(ref inner) = self.0.inner {
39 match inner.lock() {
40 Ok(mut locked) => f(&mut *locked),
41 Err(err) => global::handle_error(err),
42 }
43 }
44 }
45}
46
47impl SpanRef<'_> {
48 pub fn add_event<T>(&self, name: T, attributes: Vec<KeyValue>)
50 where
51 T: Into<Cow<'static, str>>,
52 {
53 self.with_inner_mut(|inner| inner.add_event(name, attributes))
54 }
55
56 pub fn record_exception(&self, err: &dyn Error) {
58 self.with_inner_mut(|inner| inner.record_exception(err))
59 }
60
61 pub fn record_exception_with_stacktrace<T>(&self, err: &dyn Error, stacktrace: T)
63 where
64 T: Into<Cow<'static, str>>,
65 {
66 self.with_inner_mut(|inner| inner.record_exception_with_stacktrace(err, stacktrace))
67 }
68
69 pub fn add_event_with_timestamp<T>(
71 &self,
72 name: T,
73 timestamp: std::time::SystemTime,
74 attributes: Vec<crate::KeyValue>,
75 ) where
76 T: Into<Cow<'static, str>>,
77 {
78 self.with_inner_mut(move |inner| {
79 inner.add_event_with_timestamp(name, timestamp, attributes)
80 })
81 }
82
83 pub fn span_context(&self) -> &SpanContext {
85 &self.0.span_context
86 }
87
88 pub fn is_recording(&self) -> bool {
91 self.0
92 .inner
93 .as_ref()
94 .and_then(|inner| inner.lock().ok().map(|active| active.is_recording()))
95 .unwrap_or(false)
96 }
97
98 pub fn set_attribute(&self, attribute: crate::KeyValue) {
102 self.with_inner_mut(move |inner| inner.set_attribute(attribute))
103 }
104
105 pub fn set_status(&self, code: super::StatusCode, message: String) {
108 self.with_inner_mut(move |inner| inner.set_status(code, message))
109 }
110
111 pub fn update_name<T>(&self, new_name: String)
114 where
115 T: Into<Cow<'static, str>>,
116 {
117 self.with_inner_mut(move |inner| inner.update_name(new_name))
118 }
119
120 pub fn end(&self) {
122 self.end_with_timestamp(crate::time::now());
123 }
124
125 pub fn end_with_timestamp(&self, timestamp: std::time::SystemTime) {
127 self.with_inner_mut(move |inner| inner.end_with_timestamp(timestamp))
128 }
129}
130
131pub trait TraceContextExt {
133 fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self;
137
138 fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self;
142
143 fn span(&self) -> SpanRef<'_>;
165
166 fn has_active_span(&self) -> bool;
170
171 fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self;
175}
176
177impl TraceContextExt for Context {
178 fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self {
179 Context::current_with_value(SynchronizedSpan {
180 span_context: span.span_context().clone(),
181 inner: Some(Mutex::new(global::BoxedSpan::new(span))),
182 })
183 }
184
185 fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self {
186 self.with_value(SynchronizedSpan {
187 span_context: span.span_context().clone(),
188 inner: Some(Mutex::new(global::BoxedSpan::new(span))),
189 })
190 }
191
192 fn span(&self) -> SpanRef<'_> {
193 if let Some(span) = self.get::<SynchronizedSpan>() {
194 SpanRef(span)
195 } else {
196 SpanRef(&*NOOP_SPAN)
197 }
198 }
199
200 fn has_active_span(&self) -> bool {
201 self.get::<SynchronizedSpan>().is_some()
202 }
203
204 fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self {
205 self.with_value(SynchronizedSpan {
206 span_context,
207 inner: None,
208 })
209 }
210}
211
212#[must_use = "Dropping the guard detaches the context."]
243pub fn mark_span_as_active<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> ContextGuard {
244 let cx = Context::current_with_span(span);
245 cx.attach()
246}
247
248pub fn get_active_span<F, T>(f: F) -> T
272where
273 F: FnOnce(SpanRef<'_>) -> T,
274{
275 f(Context::current().span())
276}
277
278#[pin_project]
280#[derive(Clone, Debug)]
281pub struct WithContext<T> {
282 #[pin]
283 inner: T,
284 otel_cx: Context,
285}
286
287impl<T: Sized> FutureExt for T {}
288
289impl<T: std::future::Future> std::future::Future for WithContext<T> {
290 type Output = T::Output;
291
292 fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
293 let this = self.project();
294 let _guard = this.otel_cx.clone().attach();
295
296 this.inner.poll(task_cx)
297 }
298}
299
300impl<T: Stream> Stream for WithContext<T> {
301 type Item = T::Item;
302
303 fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
304 let this = self.project();
305 let _guard = this.otel_cx.clone().attach();
306 T::poll_next(this.inner, task_cx)
307 }
308}
309
310impl<I, T: Sink<I>> Sink<I> for WithContext<T>
311where
312 T: Sink<I>,
313{
314 type Error = T::Error;
315
316 fn poll_ready(
317 self: Pin<&mut Self>,
318 task_cx: &mut TaskContext<'_>,
319 ) -> Poll<Result<(), Self::Error>> {
320 let this = self.project();
321 let _guard = this.otel_cx.clone().attach();
322 T::poll_ready(this.inner, task_cx)
323 }
324
325 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
326 let this = self.project();
327 let _guard = this.otel_cx.clone().attach();
328 T::start_send(this.inner, item)
329 }
330
331 fn poll_flush(
332 self: Pin<&mut Self>,
333 task_cx: &mut TaskContext<'_>,
334 ) -> Poll<Result<(), Self::Error>> {
335 let this = self.project();
336 let _guard = this.otel_cx.clone().attach();
337 T::poll_flush(this.inner, task_cx)
338 }
339
340 fn poll_close(
341 self: Pin<&mut Self>,
342 task_cx: &mut TaskContext<'_>,
343 ) -> Poll<Result<(), Self::Error>> {
344 let this = self.project();
345 let _enter = this.otel_cx.clone().attach();
346 T::poll_close(this.inner, task_cx)
347 }
348}
349
350pub trait FutureExt: Sized {
352 fn with_context(self, otel_cx: Context) -> WithContext<Self> {
360 WithContext {
361 inner: self,
362 otel_cx,
363 }
364 }
365
366 fn with_current_context(self) -> WithContext<Self> {
374 let otel_cx = Context::current();
375 self.with_context(otel_cx)
376 }
377}