use std::task::{Context, Poll, ready};
use std::{fmt, future::Future, future::poll_fn, pin::Pin};
use async_channel::Sender;
use crate::arbiter::{Arbiter, ArbiterCommand};
#[inline]
pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
{
let task = if let Some(mut data) = crate::task::Data::load() {
compio_runtime::spawn(async move {
let mut f = std::pin::pin!(f);
poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
})
} else {
compio_runtime::spawn(f)
};
JoinHandle {
task: Some(Either::Compio(task)),
}
}
#[derive(Debug, Copy, Clone)]
pub struct JoinError;
impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "JoinError")
}
}
impl std::error::Error for JoinError {}
#[derive(Debug)]
enum Either<T> {
Compio(compio_runtime::JoinHandle<T>),
Spawn(oneshot::AsyncReceiver<T>),
}
#[derive(Debug)]
pub struct JoinHandle<T> {
task: Option<Either<T>>,
}
impl<T> JoinHandle<T> {
pub fn cancel(mut self) {
if let Some(Either::Compio(fut)) = self.task.take() {
drop(fut.cancel());
}
}
pub fn detach(mut self) {
if let Some(Either::Compio(fut)) = self.task.take() {
fut.detach();
}
}
pub fn is_finished(&self) -> bool {
match &self.task {
Some(Either::Compio(fut)) => fut.is_finished(),
Some(Either::Spawn(fut)) => fut.is_closed(),
None => true,
}
}
}
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if let Some(Either::Compio(fut)) = self.task.take() {
fut.detach();
}
}
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(match self.task.as_mut() {
Some(Either::Compio(fut)) => {
ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
}
Some(Either::Spawn(fut)) => {
ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
}
None => Err(JoinError),
})
}
}
#[derive(Clone, Debug)]
pub struct Handle(Sender<ArbiterCommand>);
impl Handle {
pub(crate) fn new(sender: Sender<ArbiterCommand>) -> Self {
Self(sender)
}
pub fn current() -> Self {
Self(Arbiter::current().sender.clone())
}
pub fn notify(&self) {}
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (tx, rx) = oneshot::async_channel();
let _ = self
.0
.try_send(ArbiterCommand::Execute(Box::pin(async move {
let result = future.await;
let _ = tx.send(result);
})));
JoinHandle {
task: Some(Either::Spawn(rx)),
}
}
}