opentelemetry_spanprocessor_any/
runtime.rs

1//! Provides an abstraction of several async runtimes
2//!
3//! This  allows OpenTelemetry to work with any current or future runtime. There are currently
4//! builtin implementations for [Tokio] and [async-std].
5//!
6//! [Tokio]: https://crates.io/crates/tokio
7//! [async-std]: https://crates.io/crates/async-std
8
9use futures_util::{future::BoxFuture, stream::Stream};
10use std::{future::Future, time::Duration};
11
12/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
13/// OpenTelemetry to work with any current and hopefully future runtime implementation.
14///
15/// [Tokio]: https://crates.io/crates/tokio
16/// [async-std]: https://crates.io/crates/async-std
17pub trait Runtime: Clone + Send + Sync + 'static {
18    /// A future stream, which returns items in a previously specified interval. The item type is
19    /// not important.
20    type Interval: Stream + Send;
21
22    /// A future, which resolves after a previously specified amount of time. The output type is
23    /// not important.
24    type Delay: Future + Send;
25
26    /// Create a [Stream][futures_util::stream::Stream], which returns a new item every
27    /// [Duration][std::time::Duration].
28    fn interval(&self, duration: Duration) -> Self::Interval;
29
30    /// Spawn a new task or thread, which executes the given future.
31    ///
32    /// # Note
33    ///
34    /// This is mainly used to run batch span processing in the background. Note, that the function
35    /// does not return a handle. OpenTelemetry will use a different way to wait for the future to
36    /// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
37    /// current thread. This means runtime implementations need to make sure they can still execute
38    /// the given future even if the main thread is blocked.
39    fn spawn(&self, future: BoxFuture<'static, ()>);
40
41    /// Return a new future, which resolves after the specified [Duration][std::time::Duration].
42    fn delay(&self, duration: Duration) -> Self::Delay;
43}
44
45/// Runtime implementation, which works with Tokio's multi thread runtime.
46#[cfg(feature = "rt-tokio")]
47#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
48#[derive(Debug, Clone)]
49pub struct Tokio;
50
51#[cfg(feature = "rt-tokio")]
52#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
53impl Runtime for Tokio {
54    type Interval = tokio_stream::wrappers::IntervalStream;
55    type Delay = tokio::time::Sleep;
56
57    fn interval(&self, duration: Duration) -> Self::Interval {
58        crate::util::tokio_interval_stream(duration)
59    }
60
61    fn spawn(&self, future: BoxFuture<'static, ()>) {
62        let _ = tokio::spawn(future);
63    }
64
65    fn delay(&self, duration: Duration) -> Self::Delay {
66        tokio::time::sleep(duration)
67    }
68}
69
70/// Runtime implementation, which works with Tokio's current thread runtime.
71#[cfg(feature = "rt-tokio-current-thread")]
72#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
73#[derive(Debug, Clone)]
74pub struct TokioCurrentThread;
75
76#[cfg(feature = "rt-tokio-current-thread")]
77#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
78impl Runtime for TokioCurrentThread {
79    type Interval = tokio_stream::wrappers::IntervalStream;
80    type Delay = tokio::time::Sleep;
81
82    fn interval(&self, duration: Duration) -> Self::Interval {
83        crate::util::tokio_interval_stream(duration)
84    }
85
86    fn spawn(&self, future: BoxFuture<'static, ()>) {
87        // We cannot force push tracing in current thread tokio scheduler because we rely on
88        // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
89        // shutdown function so that the runtime will not finish the blocked task and kill any
90        // remaining tasks. But there is only one thread to run task, so it's a deadlock
91        //
92        // Thus, we spawn the background task in a separate thread.
93        std::thread::spawn(move || {
94            let rt = tokio::runtime::Builder::new_current_thread()
95                .enable_all()
96                .build()
97                .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
98            rt.block_on(future);
99        });
100    }
101
102    fn delay(&self, duration: Duration) -> Self::Delay {
103        tokio::time::sleep(duration)
104    }
105}
106
107/// Runtime implementation, which works with async-std.
108#[cfg(feature = "rt-async-std")]
109#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
110#[derive(Debug, Clone)]
111pub struct AsyncStd;
112
113#[cfg(feature = "rt-async-std")]
114#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
115impl Runtime for AsyncStd {
116    type Interval = async_std::stream::Interval;
117    type Delay = BoxFuture<'static, ()>;
118
119    fn interval(&self, duration: Duration) -> Self::Interval {
120        async_std::stream::interval(duration)
121    }
122
123    fn spawn(&self, future: BoxFuture<'static, ()>) {
124        let _ = async_std::task::spawn(future);
125    }
126
127    fn delay(&self, duration: Duration) -> Self::Delay {
128        Box::pin(async_std::task::sleep(duration))
129    }
130}