pub mod mpsc {
use std::fmt;
pub use ::kanal::ReceiveError;
pub use ::kanal::SendError;
#[derive(Debug)]
pub struct Sender<T>(::kanal::AsyncSender<T>);
#[derive(Debug)]
pub struct Receiver<T>(::kanal::AsyncReceiver<T>);
#[derive(Debug, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Disconnected,
}
#[derive(Debug, PartialEq, Eq)]
pub enum TrySendError<T> {
Full(T),
Disconnected(T),
}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Empty => write!(f, "receive failed because channel is empty"),
Self::Disconnected => {
write!(f, "receive failed because sender dropped unexpectedly")
}
}
}
}
impl std::error::Error for TryRecvError {}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Full(_) => write!(f, "send failed because channel is full"),
Self::Disconnected(_) => {
write!(f, "send failed because receiver dropped unexpectedly")
}
}
}
}
impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
pub fn channel<T>(size: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = ::kanal::bounded_async(size);
(Sender(tx), Receiver(rx))
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> Sender<T> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn send(&self, data: T) -> Result<(), SendError> {
self.0.send(data).await
}
#[cfg(target_arch = "wasm32")]
pub async fn send(&self, mut data: T) -> Result<(), SendError> {
loop {
match self.try_send(data) {
Ok(()) => return Ok(()),
Err(TrySendError::Full(value)) => {
data = value;
crate::runtime::wasm_event_loop_yield().await;
}
Err(TrySendError::Disconnected(_)) => return Err(SendError::ReceiveClosed),
}
}
}
pub fn try_send(&self, data: T) -> Result<(), TrySendError<T>> {
let mut data = Some(data);
match self.0.try_send_option(&mut data) {
Ok(true) => Ok(()),
Ok(false) => Err(TrySendError::Full(data.expect("send data lost"))),
Err(_) => Err(TrySendError::Disconnected(data.expect("send data lost"))),
}
}
pub fn is_closed(&self) -> bool {
self.0.is_disconnected() || self.0.is_closed()
}
pub fn close(&self) -> Result<(), SendError> {
self.0.close().map_err(|_| SendError::Closed)
}
}
impl<T> Receiver<T> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn recv(&self) -> Option<T> {
self.0.recv().await.ok()
}
#[cfg(target_arch = "wasm32")]
pub async fn recv(&self) -> Option<T> {
loop {
match self.try_recv() {
Ok(value) => return Some(value),
Err(TryRecvError::Empty) => {
crate::runtime::wasm_event_loop_yield().await;
}
Err(TryRecvError::Disconnected) => return None,
}
}
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match self.0.try_recv() {
Ok(Some(v)) => Ok(v),
Ok(None) => Err(TryRecvError::Empty),
Err(_) => Err(TryRecvError::Disconnected),
}
}
}
}
pub mod oneshot {
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub use futures::channel::oneshot::Canceled;
#[derive(Debug)]
pub struct Sender<T>(futures::channel::oneshot::Sender<T>);
#[derive(Debug)]
pub struct Receiver<T> {
inner: futures::channel::oneshot::Receiver<T>,
#[cfg(target_arch = "wasm32")]
wake_scheduled: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = futures::channel::oneshot::channel();
(
Sender(tx),
Receiver {
inner: rx,
#[cfg(target_arch = "wasm32")]
wake_scheduled: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
},
)
}
impl<T> Sender<T> {
pub fn send(self, data: T) -> Result<(), T> {
self.0.send(data)
}
}
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(target_arch = "wasm32"))]
{
Pin::new(&mut self.inner).poll(cx)
}
#[cfg(target_arch = "wasm32")]
{
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref());
match Pin::new(&mut self.inner).poll(&mut noop_cx) {
Poll::Ready(result) => Poll::Ready(result),
Poll::Pending => {
schedule_wake(&self.wake_scheduled, cx);
Poll::Pending
}
}
}
}
}
#[cfg(target_arch = "wasm32")]
fn schedule_wake(
wake_scheduled: &std::sync::Arc<std::sync::atomic::AtomicBool>,
cx: &Context<'_>,
) {
use std::sync::atomic::Ordering;
if !wake_scheduled.swap(true, Ordering::AcqRel) {
let wake_scheduled_for_callback = wake_scheduled.clone();
let waker = cx.waker().clone();
let callback = wasm_bindgen::closure::Closure::once_into_js(move || {
wake_scheduled_for_callback.store(false, Ordering::Release);
waker.wake();
});
let function = wasm_bindgen::JsCast::unchecked_ref::<js_sys::Function>(&callback);
if let Some(window) = web_sys::window() {
let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(function, 1);
} else if let Ok(scope) =
wasm_bindgen::JsCast::dyn_into::<web_sys::WorkerGlobalScope>(js_sys::global())
{
let _ = scope.set_timeout_with_callback_and_timeout_and_arguments_0(function, 1);
} else {
wake_scheduled.store(false, Ordering::Release);
cx.waker().wake_by_ref();
}
}
}
}