opentelemetry_spanprocessor_any/sdk/trace/
runtime.rs

1//! # Trace Runtime
2//! Trace runtime is an extension to [`Runtime`]. Currently it provides a channel that used
3//! by [`BatchSpanProcessor`].
4//!
5//! [`BatchSpanProcessor`]: crate::sdk::trace::BatchSpanProcessor
6//! [`Runtime`]: crate::runtime::Runtime
7#[cfg(feature = "rt-async-std")]
8use crate::runtime::AsyncStd;
9use crate::runtime::Runtime;
10#[cfg(feature = "rt-tokio")]
11use crate::runtime::Tokio;
12#[cfg(feature = "rt-tokio-current-thread")]
13use crate::runtime::TokioCurrentThread;
14use crate::sdk::trace::BatchMessage;
15use crate::trace::TraceError;
16use futures_util::stream::Stream;
17use std::fmt::Debug;
18
19#[cfg(any(
20    feature = "rt-tokio",
21    feature = "rt-tokio-current-thread",
22    feature = "rt-async-std"
23))]
24const CHANNEL_FULL_ERROR: &str =
25    "cannot send span to the batch span processor because the channel is full";
26#[cfg(any(
27    feature = "rt-tokio",
28    feature = "rt-tokio-current-thread",
29    feature = "rt-async-std"
30))]
31const CHANNEL_CLOSED_ERROR: &str =
32    "cannot send span to the batch span processor because the channel is closed";
33
34/// Trace runtime is an extension to [`Runtime`]. Currently it provides a channel that used
35/// by [`BatchSpanProcessor`].
36///
37/// [`BatchSpanProcessor`]: crate::sdk::trace::BatchSpanProcessor
38/// [`Runtime`]: crate::runtime::Runtime
39pub trait TraceRuntime: Runtime {
40    /// A future stream to receive the batch messages from channels.
41    type Receiver: Stream<Item = BatchMessage> + Send;
42
43    /// A batch messages sender that could be sent across thread safely.
44    type Sender: TrySend + Debug;
45
46    /// Return the sender and receiver used to send batch message between tasks.
47    fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver);
48}
49
50/// TrySend is an abstraction of sender that is capable to send BatchMessage with reference.
51pub trait TrySend: Sync + Send {
52    /// Try to send one batch message to worker thread.
53    ///
54    /// It can fail because either the receiver has closed or the buffer is full.
55    fn try_send(&self, item: BatchMessage) -> Result<(), TraceError>;
56}
57
58#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
59impl TrySend for tokio::sync::mpsc::Sender<BatchMessage> {
60    fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> {
61        self.try_send(item).map_err(|err| match err {
62            tokio::sync::mpsc::error::TrySendError::Full(_) => TraceError::from(CHANNEL_FULL_ERROR),
63            tokio::sync::mpsc::error::TrySendError::Closed(_) => {
64                TraceError::from(CHANNEL_CLOSED_ERROR)
65            }
66        })
67    }
68}
69
70#[cfg(feature = "rt-tokio")]
71#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
72impl TraceRuntime for Tokio {
73    type Receiver = tokio_stream::wrappers::ReceiverStream<BatchMessage>;
74    type Sender = tokio::sync::mpsc::Sender<BatchMessage>;
75
76    fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
77        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
78        (
79            sender,
80            tokio_stream::wrappers::ReceiverStream::new(receiver),
81        )
82    }
83}
84
85#[cfg(feature = "rt-tokio-current-thread")]
86#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
87impl TraceRuntime for TokioCurrentThread {
88    type Receiver = tokio_stream::wrappers::ReceiverStream<BatchMessage>;
89    type Sender = tokio::sync::mpsc::Sender<BatchMessage>;
90
91    fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
92        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
93        (
94            sender,
95            tokio_stream::wrappers::ReceiverStream::new(receiver),
96        )
97    }
98}
99
100#[cfg(feature = "rt-async-std")]
101impl TrySend for async_std::channel::Sender<BatchMessage> {
102    fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> {
103        self.try_send(item).map_err(|err| match err {
104            async_std::channel::TrySendError::Full(_) => TraceError::from(CHANNEL_FULL_ERROR),
105            async_std::channel::TrySendError::Closed(_) => TraceError::from(CHANNEL_CLOSED_ERROR),
106        })
107    }
108}
109
110#[cfg(feature = "rt-async-std")]
111#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
112impl TraceRuntime for AsyncStd {
113    type Receiver = async_std::channel::Receiver<BatchMessage>;
114    type Sender = async_std::channel::Sender<BatchMessage>;
115
116    fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
117        async_std::channel::bounded(capacity)
118    }
119}