use super::super::FeagiAsyncRuntime;
#[cfg(feature = "async-tokio")]
use core::future::Future;
#[cfg(feature = "async-tokio")]
use core::pin::Pin;
#[cfg(feature = "async-tokio")]
use core::task::{Context, Poll};
#[cfg(feature = "async-tokio")]
use core::time::Duration;
#[cfg(feature = "async-tokio")]
use tokio::runtime::{Handle, Runtime};
#[cfg(feature = "async-tokio")]
pub struct TokioRuntime {
runtime: Runtime,
}
#[cfg(feature = "async-tokio")]
impl TokioRuntime {
pub fn new() -> Self {
Self {
runtime: Runtime::new().expect("Tokio runtime failed to initialize"),
}
}
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
self.runtime.block_on(fut)
}
}
#[cfg(feature = "async-tokio")]
impl Default for TokioRuntime {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "async-tokio")]
impl FeagiAsyncRuntime for TokioRuntime {
type TaskHandle<T: Send + 'static> = TokioTaskHandle<T>;
fn spawn<F, T>(&self, fut: F) -> Self::TaskHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
TokioTaskHandle(self.runtime.spawn(fut))
}
fn delay(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(tokio::time::sleep(duration))
}
fn try_block_on<F, T>(&self, future: F) -> Result<T, super::super::BlockOnError>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Ok(self.runtime.block_on(future))
}
fn with_timeout<F, T>(
&self,
future: F,
timeout: Duration,
) -> Pin<Box<dyn Future<Output = Result<T, super::super::TimeoutError>> + Send + 'static>>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Box::pin(async move {
tokio::select! {
result = future => Ok(result),
_ = tokio::time::sleep(timeout) => Err(super::super::TimeoutError),
}
})
}
}
#[cfg(feature = "async-tokio")]
impl FeagiAsyncRuntime for TokioHandle {
type TaskHandle<T: Send + 'static> = TokioTaskHandle<T>;
fn spawn<F, T>(&self, fut: F) -> Self::TaskHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
TokioTaskHandle(self.handle.spawn(fut))
}
fn delay(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(tokio::time::sleep(duration))
}
fn try_block_on<F, T>(&self, _future: F) -> Result<T, super::super::BlockOnError>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Err(super::super::BlockOnError::not_supported(
"TokioHandle does not support blocking. Use TokioRuntime::try_block_on() instead.",
))
}
fn with_timeout<F, T>(
&self,
future: F,
timeout: Duration,
) -> Pin<Box<dyn Future<Output = Result<T, super::super::TimeoutError>> + Send + 'static>>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Box::pin(async move {
tokio::select! {
result = future => Ok(result),
_ = tokio::time::sleep(timeout) => Err(super::super::TimeoutError),
}
})
}
}
#[cfg(feature = "async-tokio")]
pub struct TokioHandle {
handle: Handle,
}
#[cfg(feature = "async-tokio")]
impl TokioHandle {
pub fn current() -> Self {
Self {
handle: Handle::current(),
}
}
}
#[cfg(feature = "async-tokio")]
pub struct TokioTaskHandle<T>(tokio::task::JoinHandle<T>);
#[cfg(feature = "async-tokio")]
impl<T> Future for TokioTaskHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
match inner.poll(cx) {
Poll::Ready(Ok(value)) => Poll::Ready(value),
Poll::Ready(Err(e)) => panic!("Spawned task panicked: {e}"),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(feature = "async-tokio")]
unsafe impl<T: Send> Send for TokioTaskHandle<T> {}