opentelemetry_spanprocessor_any/
runtime.rs1use futures_util::{future::BoxFuture, stream::Stream};
10use std::{future::Future, time::Duration};
11
12pub trait Runtime: Clone + Send + Sync + 'static {
18 type Interval: Stream + Send;
21
22 type Delay: Future + Send;
25
26 fn interval(&self, duration: Duration) -> Self::Interval;
29
30 fn spawn(&self, future: BoxFuture<'static, ()>);
40
41 fn delay(&self, duration: Duration) -> Self::Delay;
43}
44
45#[cfg(feature = "rt-tokio")]
47#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
48#[derive(Debug, Clone)]
49pub struct Tokio;
50
51#[cfg(feature = "rt-tokio")]
52#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
53impl Runtime for Tokio {
54 type Interval = tokio_stream::wrappers::IntervalStream;
55 type Delay = tokio::time::Sleep;
56
57 fn interval(&self, duration: Duration) -> Self::Interval {
58 crate::util::tokio_interval_stream(duration)
59 }
60
61 fn spawn(&self, future: BoxFuture<'static, ()>) {
62 let _ = tokio::spawn(future);
63 }
64
65 fn delay(&self, duration: Duration) -> Self::Delay {
66 tokio::time::sleep(duration)
67 }
68}
69
70#[cfg(feature = "rt-tokio-current-thread")]
72#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
73#[derive(Debug, Clone)]
74pub struct TokioCurrentThread;
75
76#[cfg(feature = "rt-tokio-current-thread")]
77#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
78impl Runtime for TokioCurrentThread {
79 type Interval = tokio_stream::wrappers::IntervalStream;
80 type Delay = tokio::time::Sleep;
81
82 fn interval(&self, duration: Duration) -> Self::Interval {
83 crate::util::tokio_interval_stream(duration)
84 }
85
86 fn spawn(&self, future: BoxFuture<'static, ()>) {
87 std::thread::spawn(move || {
94 let rt = tokio::runtime::Builder::new_current_thread()
95 .enable_all()
96 .build()
97 .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
98 rt.block_on(future);
99 });
100 }
101
102 fn delay(&self, duration: Duration) -> Self::Delay {
103 tokio::time::sleep(duration)
104 }
105}
106
107#[cfg(feature = "rt-async-std")]
109#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
110#[derive(Debug, Clone)]
111pub struct AsyncStd;
112
113#[cfg(feature = "rt-async-std")]
114#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
115impl Runtime for AsyncStd {
116 type Interval = async_std::stream::Interval;
117 type Delay = BoxFuture<'static, ()>;
118
119 fn interval(&self, duration: Duration) -> Self::Interval {
120 async_std::stream::interval(duration)
121 }
122
123 fn spawn(&self, future: BoxFuture<'static, ()>) {
124 let _ = async_std::task::spawn(future);
125 }
126
127 fn delay(&self, duration: Duration) -> Self::Delay {
128 Box::pin(async_std::task::sleep(duration))
129 }
130}