#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
mod queue;
use std::cell::Cell;
use std::error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use async_event::Event;
use diatomic_waker::primitives::DiatomicWaker;
use recycle_box::RecycleBox;
use queue::{PopError, PushError, Queue};
use recycle_box::coerce_box;
use crate::model::{Context, Model};
thread_local! { pub(crate) static THREAD_MSG_COUNT: Cell<isize> = const { Cell::new(0) }; }
struct Inner<M> {
queue: Queue<dyn MessageFn<M>>,
receiver_signal: DiatomicWaker,
sender_signal: Event,
}
impl<M: 'static> Inner<M> {
fn new(capacity: usize) -> Self {
Self {
queue: Queue::new(capacity),
receiver_signal: DiatomicWaker::new(),
sender_signal: Event::new(),
}
}
}
pub(crate) struct Receiver<M> {
inner: Arc<Inner<M>>,
future_box: Option<RecycleBox<()>>,
}
impl<M: Model> Receiver<M> {
pub(crate) fn new(capacity: usize) -> Self {
let inner = Arc::new(Inner::new(capacity));
Receiver {
inner,
future_box: Some(RecycleBox::new(())),
}
}
pub(crate) fn sender(&self) -> Sender<M> {
Sender {
inner: self.inner.clone(),
}
}
pub(crate) fn observer(&self) -> impl ChannelObserver + use<M> {
Observer {
inner: self.inner.clone(),
}
}
pub(crate) async fn recv(
&mut self,
model: &mut M,
cx: &Context<M>,
env: &mut M::Env,
) -> Result<(), RecvError> {
let msg = unsafe {
self.inner
.receiver_signal
.wait_until(|| match self.inner.queue.pop() {
Ok(msg) => Some(Some(msg)),
Err(PopError::Empty) => None,
Err(PopError::Closed) => Some(None),
})
.await
};
match msg {
Some(mut msg) => {
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1));
let fut = msg.call_once(model, cx, env, self.future_box.take().unwrap());
drop(msg);
self.inner.sender_signal.notify_one();
let mut fut = RecycleBox::into_pin(fut);
fut.as_mut().await;
self.future_box = Some(RecycleBox::vacate_pinned(fut));
Ok(())
}
None => Err(RecvError),
}
}
#[allow(unused)]
pub(crate) fn close(&self) {
if !self.inner.queue.is_closed() {
self.inner.queue.close();
self.inner.sender_signal.notify_all();
}
}
pub(crate) fn channel_id(&self) -> ChannelId {
ChannelId(&*self.inner as *const Inner<M> as usize)
}
}
impl<M> Drop for Receiver<M> {
fn drop(&mut self) {
self.inner.queue.close();
self.inner.sender_signal.notify_all();
}
}
impl<M> fmt::Debug for Receiver<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver").finish_non_exhaustive()
}
}
pub(crate) struct Sender<M: 'static> {
inner: Arc<Inner<M>>,
}
impl<M: Model> Sender<M> {
pub(crate) async fn send<F>(&self, msg_fn: F) -> Result<(), SendError>
where
F: for<'a> FnOnce(
&'a mut M,
&'a Context<M>,
&'a mut M::Env,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send
+ 'static,
{
let mut msg_fn = Some(|vacated_box| -> RecycleBox<dyn MessageFn<M>> {
coerce_box!(RecycleBox::recycle(vacated_box, MessageFnOnce::new(msg_fn)))
});
let success = self
.inner
.sender_signal
.wait_until(|| {
match self.inner.queue.push(msg_fn.take().unwrap()) {
Ok(()) => Some(true),
Err(PushError::Full(m)) => {
msg_fn = Some(m);
None
}
Err(PushError::Closed) => Some(false),
}
})
.await;
if success {
self.inner.receiver_signal.notify();
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_add(1));
Ok(())
} else {
Err(SendError)
}
}
#[allow(unused)]
pub(crate) fn close(&self) {
self.inner.queue.close();
self.inner.receiver_signal.notify();
self.inner.sender_signal.notify_all();
}
#[allow(unused)]
pub(crate) fn is_closed(&self) -> bool {
self.inner.queue.is_closed()
}
pub(crate) fn channel_id(&self) -> usize {
Arc::as_ptr(&self.inner) as usize
}
}
impl<M> Clone for Sender<M> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<M> fmt::Debug for Sender<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender").finish_non_exhaustive()
}
}
pub(crate) trait ChannelObserver: Send {
fn len(&self) -> usize;
}
#[derive(Clone)]
pub(crate) struct Observer<M: 'static> {
inner: Arc<Inner<M>>,
}
impl<M: Model> ChannelObserver for Observer<M> {
fn len(&self) -> usize {
self.inner.queue.len()
}
}
trait MessageFn<M: Model>: Send {
fn call_once<'a>(
&mut self,
model: &'a mut M,
cx: &'a Context<M>,
env: &'a mut M::Env,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>;
}
struct MessageFnOnce<F, M> {
msg_fn: Option<F>,
_phantom: PhantomData<fn(&mut M)>,
}
impl<F, M> MessageFnOnce<F, M> {
fn new(msg_fn: F) -> Self {
Self {
msg_fn: Some(msg_fn),
_phantom: PhantomData,
}
}
}
impl<F, M: Model> MessageFn<M> for MessageFnOnce<F, M>
where
F: for<'a> FnOnce(
&'a mut M,
&'a Context<M>,
&'a mut M::Env,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send,
{
fn call_once<'a>(
&mut self,
model: &'a mut M,
cx: &'a Context<M>,
env: &'a mut M::Env,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a> {
let closure = self.msg_fn.take().unwrap();
(closure)(model, cx, env, recycle_box)
}
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct ChannelId(usize);
impl fmt::Display for ChannelId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct SendError;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct RecvError;
impl error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"receiving from a closed channel".fmt(f)
}
}