asynchronix 0.2.4

[Asynchronix is now NeXosim] A high performance asychronous compute framework for system simulation.
Documentation
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use recycle_box::{coerce_box, RecycleBox};

use crate::channel;
use crate::model::{InputFn, Model, ReplierFn};
use crate::util::spsc_queue;

/// Abstraction over `EventSender` and `QuerySender`.
pub(super) trait Sender<T, R>: Send {
    fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>>;
}

/// An object that can send a payload to a model.
pub(super) struct EventSender<M: 'static, F, T, S> {
    func: F,
    sender: channel::Sender<M>,
    fut_storage: Option<RecycleBox<()>>,
    _phantom_closure: PhantomData<fn(&mut M, T)>,
    _phantom_closure_marker: PhantomData<S>,
}

impl<M: Send, F, T, S> EventSender<M, F, T, S>
where
    M: Model,
    F: for<'a> InputFn<'a, M, T, S>,
    T: Send + 'static,
{
    pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
        Self {
            func,
            sender,
            fut_storage: None,
            _phantom_closure: PhantomData,
            _phantom_closure_marker: PhantomData,
        }
    }
}

impl<M: Send, F, T, S> Sender<T, ()> for EventSender<M, F, T, S>
where
    M: Model,
    F: for<'a> InputFn<'a, M, T, S> + Copy,
    T: Send + 'static,
    S: Send,
{
    fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
        let func = self.func;

        let fut = self.sender.send(move |model, scheduler, recycle_box| {
            let fut = func.call(model, arg, scheduler);

            coerce_box!(RecycleBox::recycle(recycle_box, fut))
        });

        RecycledFuture::new(&mut self.fut_storage, async move {
            fut.await.map_err(|_| SendError {})
        })
    }
}

/// An object that can send a payload to a model and retrieve a response.
pub(super) struct QuerySender<M: 'static, F, T, R, S> {
    func: F,
    sender: channel::Sender<M>,
    receiver: multishot::Receiver<R>,
    fut_storage: Option<RecycleBox<()>>,
    _phantom_closure: PhantomData<fn(&mut M, T) -> R>,
    _phantom_closure_marker: PhantomData<S>,
}

impl<M, F, T, R, S> QuerySender<M, F, T, R, S>
where
    M: Model,
    F: for<'a> ReplierFn<'a, M, T, R, S>,
    T: Send + 'static,
    R: Send + 'static,
{
    pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
        Self {
            func,
            sender,
            receiver: multishot::Receiver::new(),
            fut_storage: None,
            _phantom_closure: PhantomData,
            _phantom_closure_marker: PhantomData,
        }
    }
}

impl<M, F, T, R, S> Sender<T, R> for QuerySender<M, F, T, R, S>
where
    M: Model,
    F: for<'a> ReplierFn<'a, M, T, R, S> + Copy,
    T: Send + 'static,
    R: Send + 'static,
    S: Send,
{
    fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
        let func = self.func;
        let sender = &mut self.sender;
        let reply_receiver = &mut self.receiver;
        let fut_storage = &mut self.fut_storage;

        // The previous future generated by this method should have been polled
        // to completion so a new sender should be readily available.
        let reply_sender = reply_receiver.sender().unwrap();

        let send_fut = sender.send(move |model, scheduler, recycle_box| {
            let fut = async move {
                let reply = func.call(model, arg, scheduler).await;
                reply_sender.send(reply);
            };

            coerce_box!(RecycleBox::recycle(recycle_box, fut))
        });

        RecycledFuture::new(fut_storage, async move {
            // Send the message.
            send_fut.await.map_err(|_| SendError {})?;

            // Wait until the message is processed and the reply is sent back.
            // If an error is received, it most likely means the mailbox was
            // dropped before the message was processed.
            reply_receiver.recv().await.map_err(|_| SendError {})
        })
    }
}

/// An object that can send a payload to an unbounded queue.
pub(super) struct EventStreamSender<T> {
    producer: spsc_queue::Producer<T>,
    fut_storage: Option<RecycleBox<()>>,
}

impl<T> EventStreamSender<T> {
    pub(super) fn new(producer: spsc_queue::Producer<T>) -> Self {
        Self {
            producer,
            fut_storage: None,
        }
    }
}

impl<T> Sender<T, ()> for EventStreamSender<T>
where
    T: Send + 'static,
{
    fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
        let producer = &mut self.producer;

        RecycledFuture::new(&mut self.fut_storage, async move {
            let _ = producer.push(arg);

            Ok(())
        })
    }
}

/// An object that can send a payload to a mutex-protected slot.
pub(super) struct EventSlotSender<T> {
    slot: Arc<Mutex<Option<T>>>,
    fut_storage: Option<RecycleBox<()>>,
}

impl<T> EventSlotSender<T> {
    pub(super) fn new(slot: Arc<Mutex<Option<T>>>) -> Self {
        Self {
            slot,
            fut_storage: None,
        }
    }
}

impl<T> Sender<T, ()> for EventSlotSender<T>
where
    T: Send + 'static,
{
    fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
        let slot = &*self.slot;

        RecycledFuture::new(&mut self.fut_storage, async move {
            let mut slot = slot.lock().unwrap();
            *slot = Some(arg);

            Ok(())
        })
    }
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
/// Error returned when the mailbox was closed or dropped.
pub(super) struct SendError {}

impl fmt::Display for SendError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "sending message into a closed mailbox")
    }
}

impl Error for SendError {}

pub(super) struct RecycledFuture<'a, T> {
    fut: ManuallyDrop<Pin<RecycleBox<dyn Future<Output = T> + Send + 'a>>>,
    lender_box: &'a mut Option<RecycleBox<()>>,
}
impl<'a, T> RecycledFuture<'a, T> {
    pub(super) fn new<F: Future<Output = T> + Send + 'a>(
        lender_box: &'a mut Option<RecycleBox<()>>,
        fut: F,
    ) -> Self {
        let vacated_box = lender_box.take().unwrap_or_else(|| RecycleBox::new(()));
        let fut: RecycleBox<dyn Future<Output = T> + Send + 'a> =
            coerce_box!(RecycleBox::recycle(vacated_box, fut));

        Self {
            fut: ManuallyDrop::new(RecycleBox::into_pin(fut)),
            lender_box,
        }
    }
}

impl<'a, T> Drop for RecycledFuture<'a, T> {
    fn drop(&mut self) {
        // Return the box to the lender.
        //
        // Safety: taking the `fut` member is safe since it is never used again.
        *self.lender_box = Some(RecycleBox::vacate_pinned(unsafe {
            ManuallyDrop::take(&mut self.fut)
        }));
    }
}

impl<'a, T> Future for RecycledFuture<'a, T> {
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.fut.as_mut().poll(cx)
    }
}