#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
mod queue;
use std::error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::sync::Arc;
use async_event::Event;
use diatomic_waker::primitives::DiatomicWaker;
use recycle_box::RecycleBox;
use queue::{PopError, PushError, Queue};
use recycle_box::coerce_box;
use crate::model::Model;
use crate::time::Scheduler;
struct Inner<M> {
queue: Queue<dyn MessageFn<M>>,
receiver_signal: DiatomicWaker,
sender_signal: Event,
sender_count: AtomicUsize,
}
impl<M: 'static> Inner<M> {
fn new(capacity: usize) -> Self {
Self {
queue: Queue::new(capacity),
receiver_signal: DiatomicWaker::new(),
sender_signal: Event::new(),
sender_count: AtomicUsize::new(0),
}
}
}
pub(crate) struct Receiver<M> {
inner: Arc<Inner<M>>,
future_box: Option<RecycleBox<()>>,
}
impl<M: Model> Receiver<M> {
pub(crate) fn new(capacity: usize) -> Self {
let inner = Arc::new(Inner::new(capacity));
Receiver {
inner,
future_box: Some(RecycleBox::new(())),
}
}
pub(crate) fn sender(&self) -> Sender<M> {
self.inner.sender_count.fetch_add(1, Ordering::Relaxed);
Sender {
inner: self.inner.clone(),
}
}
pub(crate) async fn recv(
&mut self,
model: &mut M,
scheduler: &Scheduler<M>,
) -> Result<(), RecvError> {
let msg = unsafe {
self.inner
.receiver_signal
.wait_until(|| match self.inner.queue.pop() {
Ok(msg) => Some(Some(msg)),
Err(PopError::Empty) => None,
Err(PopError::Closed) => Some(None),
})
.await
};
match msg {
Some(mut msg) => {
let fut = msg.call_once(model, scheduler, self.future_box.take().unwrap());
self.inner.sender_signal.notify_one();
let mut fut = RecycleBox::into_pin(fut);
fut.as_mut().await;
self.future_box = Some(RecycleBox::vacate_pinned(fut));
Ok(())
}
None => Err(RecvError),
}
}
#[allow(unused)]
pub(crate) fn close(&self) {
if !self.inner.queue.is_closed() {
self.inner.queue.close();
self.inner.sender_signal.notify_all();
}
}
pub(crate) fn channel_id(&self) -> ChannelId {
ChannelId(NonZeroUsize::new(&*self.inner as *const Inner<M> as usize).unwrap())
}
}
impl<M> Drop for Receiver<M> {
fn drop(&mut self) {
self.inner.queue.close();
self.inner.sender_signal.notify_all();
}
}
impl<M> fmt::Debug for Receiver<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver").finish_non_exhaustive()
}
}
pub(crate) struct Sender<M: 'static> {
inner: Arc<Inner<M>>,
}
impl<M: Model> Sender<M> {
pub(crate) async fn send<F>(&self, msg_fn: F) -> Result<(), SendError>
where
F: for<'a> FnOnce(
&'a mut M,
&'a Scheduler<M>,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send
+ 'static,
{
let mut msg_fn = Some(|vacated_box| -> RecycleBox<dyn MessageFn<M>> {
coerce_box!(RecycleBox::recycle(vacated_box, MessageFnOnce::new(msg_fn)))
});
let success = self
.inner
.sender_signal
.wait_until(|| {
match self.inner.queue.push(msg_fn.take().unwrap()) {
Ok(()) => Some(true),
Err(PushError::Full(m)) => {
msg_fn = Some(m);
None
}
Err(PushError::Closed) => Some(false),
}
})
.await;
if success {
self.inner.receiver_signal.notify();
Ok(())
} else {
Err(SendError)
}
}
#[allow(unused)]
pub(crate) fn close(&self) {
self.inner.queue.close();
self.inner.receiver_signal.notify();
self.inner.sender_signal.notify_all();
}
#[allow(unused)]
pub(crate) fn is_closed(&self) -> bool {
self.inner.queue.is_closed()
}
pub(crate) fn channel_id(&self) -> ChannelId {
ChannelId(NonZeroUsize::new(&*self.inner as *const Inner<M> as usize).unwrap())
}
}
impl<M> Clone for Sender<M> {
fn clone(&self) -> Self {
self.inner.sender_count.fetch_add(1, Ordering::Relaxed);
Self {
inner: self.inner.clone(),
}
}
}
impl<M: 'static> Drop for Sender<M> {
fn drop(&mut self) {
if self.inner.sender_count.fetch_sub(1, Ordering::Release) == 1
&& !self.inner.queue.is_closed()
{
atomic::fence(Ordering::Acquire);
self.inner.queue.close();
self.inner.receiver_signal.notify();
}
}
}
impl<M> fmt::Debug for Sender<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Address").finish_non_exhaustive()
}
}
trait MessageFn<M: Model>: Send {
fn call_once<'a>(
&mut self,
model: &'a mut M,
scheduler: &'a Scheduler<M>,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>;
}
struct MessageFnOnce<F, M> {
msg_fn: Option<F>,
_phantom: PhantomData<fn(&mut M)>,
}
impl<F, M> MessageFnOnce<F, M> {
fn new(msg_fn: F) -> Self {
Self {
msg_fn: Some(msg_fn),
_phantom: PhantomData,
}
}
}
impl<F, M: Model> MessageFn<M> for MessageFnOnce<F, M>
where
F: for<'a> FnOnce(
&'a mut M,
&'a Scheduler<M>,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send,
{
fn call_once<'a>(
&mut self,
model: &'a mut M,
scheduler: &'a Scheduler<M>,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a> {
let closure = self.msg_fn.take().unwrap();
(closure)(model, scheduler, recycle_box)
}
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct ChannelId(NonZeroUsize);
impl fmt::Display for ChannelId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct SendError;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct RecvError;
impl error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"receiving from a closed channel".fmt(f)
}
}