use futures_util::stream::{unfold, Stream};
use std::{fmt::Debug, future::Future, time::Duration};
use thiserror::Error;
#[cfg(feature = "experimental_async_runtime")]
pub trait Runtime: Clone + Send + Sync + 'static {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
}
#[cfg(feature = "experimental_async_runtime")]
#[allow(dead_code)]
pub(crate) fn to_interval_stream<T: Runtime>(
runtime: T,
interval: Duration,
) -> impl Stream<Item = ()> {
unfold((), move |_| {
let runtime_cloned = runtime.clone();
async move {
runtime_cloned.delay(interval).await;
Some(((), ()))
}
})
}
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
#[derive(Debug, Clone)]
pub struct Tokio;
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
impl Runtime for Tokio {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(future);
}
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
tokio::time::sleep(duration)
}
}
#[cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
)))
)]
#[derive(Debug, Clone)]
pub struct TokioCurrentThread;
#[cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
)))
)]
impl Runtime for TokioCurrentThread {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
rt.block_on(future);
});
}
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
tokio::time::sleep(duration)
}
}
#[cfg(feature = "experimental_async_runtime")]
pub trait RuntimeChannel: Runtime {
type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>);
}
#[cfg(feature = "experimental_async_runtime")]
#[derive(Debug, Error)]
pub enum TrySendError {
#[error("cannot send message to batch processor as the channel is full")]
ChannelFull,
#[error("cannot send message to batch processor as the channel is closed")]
ChannelClosed,
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}
#[cfg(feature = "experimental_async_runtime")]
pub trait TrySend: Sync + Send {
type Message;
fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
}
#[cfg(all(
feature = "experimental_async_runtime",
any(feature = "rt-tokio", feature = "rt-tokio-current-thread")
))]
impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
type Message = T;
fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
self.try_send(item).map_err(|err| match err {
tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
})
}
}
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
impl RuntimeChannel for Tokio {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}
#[cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
)))
)]
impl RuntimeChannel for TokioCurrentThread {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}
#[cfg(feature = "experimental_async_runtime")]
#[derive(Debug, Clone, Copy)]
pub struct NoAsync;
#[cfg(feature = "experimental_async_runtime")]
impl Runtime for NoAsync {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
std::thread::spawn(move || {
futures_executor::block_on(future);
});
}
#[allow(clippy::manual_async_fn)]
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
async move {
std::thread::sleep(duration);
}
}
}