opentelemetry_spanprocessor_any/sdk/trace/
runtime.rs1#[cfg(feature = "rt-async-std")]
8use crate::runtime::AsyncStd;
9use crate::runtime::Runtime;
10#[cfg(feature = "rt-tokio")]
11use crate::runtime::Tokio;
12#[cfg(feature = "rt-tokio-current-thread")]
13use crate::runtime::TokioCurrentThread;
14use crate::sdk::trace::BatchMessage;
15use crate::trace::TraceError;
16use futures_util::stream::Stream;
17use std::fmt::Debug;
18
19#[cfg(any(
20 feature = "rt-tokio",
21 feature = "rt-tokio-current-thread",
22 feature = "rt-async-std"
23))]
24const CHANNEL_FULL_ERROR: &str =
25 "cannot send span to the batch span processor because the channel is full";
26#[cfg(any(
27 feature = "rt-tokio",
28 feature = "rt-tokio-current-thread",
29 feature = "rt-async-std"
30))]
31const CHANNEL_CLOSED_ERROR: &str =
32 "cannot send span to the batch span processor because the channel is closed";
33
34pub trait TraceRuntime: Runtime {
40 type Receiver: Stream<Item = BatchMessage> + Send;
42
43 type Sender: TrySend + Debug;
45
46 fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver);
48}
49
50pub trait TrySend: Sync + Send {
52 fn try_send(&self, item: BatchMessage) -> Result<(), TraceError>;
56}
57
58#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
59impl TrySend for tokio::sync::mpsc::Sender<BatchMessage> {
60 fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> {
61 self.try_send(item).map_err(|err| match err {
62 tokio::sync::mpsc::error::TrySendError::Full(_) => TraceError::from(CHANNEL_FULL_ERROR),
63 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
64 TraceError::from(CHANNEL_CLOSED_ERROR)
65 }
66 })
67 }
68}
69
70#[cfg(feature = "rt-tokio")]
71#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
72impl TraceRuntime for Tokio {
73 type Receiver = tokio_stream::wrappers::ReceiverStream<BatchMessage>;
74 type Sender = tokio::sync::mpsc::Sender<BatchMessage>;
75
76 fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
77 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
78 (
79 sender,
80 tokio_stream::wrappers::ReceiverStream::new(receiver),
81 )
82 }
83}
84
85#[cfg(feature = "rt-tokio-current-thread")]
86#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
87impl TraceRuntime for TokioCurrentThread {
88 type Receiver = tokio_stream::wrappers::ReceiverStream<BatchMessage>;
89 type Sender = tokio::sync::mpsc::Sender<BatchMessage>;
90
91 fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
92 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
93 (
94 sender,
95 tokio_stream::wrappers::ReceiverStream::new(receiver),
96 )
97 }
98}
99
100#[cfg(feature = "rt-async-std")]
101impl TrySend for async_std::channel::Sender<BatchMessage> {
102 fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> {
103 self.try_send(item).map_err(|err| match err {
104 async_std::channel::TrySendError::Full(_) => TraceError::from(CHANNEL_FULL_ERROR),
105 async_std::channel::TrySendError::Closed(_) => TraceError::from(CHANNEL_CLOSED_ERROR),
106 })
107 }
108}
109
110#[cfg(feature = "rt-async-std")]
111#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
112impl TraceRuntime for AsyncStd {
113 type Receiver = async_std::channel::Receiver<BatchMessage>;
114 type Sender = async_std::channel::Sender<BatchMessage>;
115
116 fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
117 async_std::channel::bounded(capacity)
118 }
119}