use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::channel::oneshot;
use tokio_with_wasm::alias as tww_alias;
use tww_alias::task as tww_task;
fn forward_tww_handle<H, T>(tx: oneshot::Sender<Result<T, JoinError>>, handle: H)
where
H: Future<Output = Result<T, tww_task::JoinError>> + 'static,
T: 'static,
{
wasm_bindgen_futures::spawn_local(async move {
let result = match handle.await {
Ok(val) => Ok(val),
Err(_) => Err(JoinError { cancelled: true }),
};
drop(tx.send(result));
});
}
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + 'static,
T: 'static,
{
let (tx, rx) = oneshot::channel();
if crate::thread::is_worker_thread() {
wasm_safe_thread::task_begin();
wasm_bindgen_futures::spawn_local(async move {
drop(tx.send(Ok(future.await)));
wasm_safe_thread::task_finished();
});
} else {
forward_tww_handle(tx, tww_task::spawn(future));
}
JoinHandle { rx }
}
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
if crate::thread::is_worker_thread() {
drop(crate::thread::spawn(move || {
drop(tx.send(Ok(f())));
}));
} else {
forward_tww_handle(tx, tww_task::spawn_blocking(f));
}
JoinHandle { rx }
}
pub struct JoinHandle<T> {
rx: oneshot::Receiver<Result<T, JoinError>>,
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Ready(Ok(result)) => Poll::Ready(result),
Poll::Ready(Err(_)) => Poll::Ready(Err(JoinError { cancelled: true })),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug)]
pub struct JoinError {
cancelled: bool,
}
impl JoinError {
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.cancelled
}
}
impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("task failed to execute to completion")
}
}
impl std::error::Error for JoinError {}