use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use dyn_clone::DynClone;
use recycle_box::{RecycleBox, coerce_box};
use crate::channel;
use crate::channel::SendError;
use crate::model::Model;
use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
use crate::util::traits::Sealed;
pub(super) trait InfallibleSender<T, R>: DynClone + Send + Sealed {
fn send(&mut self, arg: &T) -> RecycledFuture<'_, Result<R, SendError>>;
fn send_owned(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
self.send(&arg)
}
}
dyn_clone::clone_trait_object!(<T, R> InfallibleSender<T, R>);
pub(super) trait Sender<T, R>: DynClone + Send {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
self.send(&arg)
}
}
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
impl<S, T, R> Sender<T, R> for S
where
S: InfallibleSender<T, R>,
{
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
Some(self.send(arg))
}
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
Some(self.send_owned(arg))
}
}
pub(super) struct InputSender<M, F, T, S>
where
M: 'static,
{
func: F,
sender: channel::Sender<M>,
fut_storage: Option<RecycleBox<()>>,
_phantom_closure: PhantomData<fn(&mut M, T)>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, F, T, S> Sealed for InputSender<M, F, T, S> {}
impl<M, F, T, S> InputSender<M, F, T, S>
where
M: 'static,
{
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
Self {
func,
sender,
fut_storage: None,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, F, T, S> InfallibleSender<T, ()> for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Clone + Send + 'static,
S: Send,
{
fn send(&mut self, arg: &T) -> RecycledFuture<'_, Result<(), SendError>> {
InfallibleSender::send_owned(self, arg.clone())
}
fn send_owned(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let func = self.func.clone();
let fut = self.sender.send(move |model, scheduler, env, recycle_box| {
let fut = func.call(model, arg, scheduler, env);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(&mut self.fut_storage, fut)
}
}
impl<M, F, T, S> Clone for InputSender<M, F, T, S>
where
M: 'static,
F: Clone,
{
fn clone(&self) -> Self {
Self {
func: self.func.clone(),
sender: self.sender.clone(),
fut_storage: None,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
pub(super) struct MapInputSender<M, C, F, T, U, S>
where
M: 'static,
{
map: Arc<C>,
func: F,
sender: channel::Sender<M>,
fut_storage: Option<RecycleBox<()>>,
_phantom_map: PhantomData<fn(T) -> U>,
_phantom_closure: PhantomData<fn(&mut M, U)>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, F, T, U, S> MapInputSender<M, C, F, T, U, S>
where
M: 'static,
{
pub(super) fn new(map: C, func: F, sender: channel::Sender<M>) -> Self {
Self {
map: Arc::new(map),
func,
sender,
fut_storage: None,
_phantom_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, C, F, T, U, S> Sealed for MapInputSender<M, C, F, T, U, S> {}
impl<M, C, F, T, U, S> InfallibleSender<T, ()> for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(&T) -> U + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: &T) -> RecycledFuture<'_, Result<(), SendError>> {
let func = self.func.clone();
let arg = (self.map)(arg);
let fut = self.sender.send(move |model, scheduler, env, recycle_box| {
let fut = func.call(model, arg, scheduler, env);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(&mut self.fut_storage, fut)
}
}
impl<M, C, F, T, U, S> Clone for MapInputSender<M, C, F, T, U, S>
where
M: 'static,
F: Clone,
{
fn clone(&self) -> Self {
Self {
map: self.map.clone(),
func: self.func.clone(),
sender: self.sender.clone(),
fut_storage: None,
_phantom_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
pub(super) struct FilterMapInputSender<M, C, F, T, U, S>
where
M: 'static,
{
filter_map: Arc<C>,
func: F,
sender: channel::Sender<M>,
fut_storage: Option<RecycleBox<()>>,
_phantom_filter_map: PhantomData<fn(T) -> Option<U>>,
_phantom_closure: PhantomData<fn(&mut M, U)>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, F, T, U, S> FilterMapInputSender<M, C, F, T, U, S>
where
M: 'static,
{
pub(super) fn new(filter_map: C, func: F, sender: channel::Sender<M>) -> Self {
Self {
filter_map: Arc::new(filter_map),
func,
sender,
fut_storage: None,
_phantom_filter_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(&T) -> Option<U> + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
(self.filter_map)(arg).map(|arg| {
let func = self.func.clone();
let fut = self.sender.send(move |model, scheduler, env, recycle_box| {
let fut = func.call(model, arg, scheduler, env);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(&mut self.fut_storage, fut)
})
}
}
impl<M, C, F, T, U, S> Clone for FilterMapInputSender<M, C, F, T, U, S>
where
M: 'static,
F: Clone,
{
fn clone(&self) -> Self {
Self {
filter_map: self.filter_map.clone(),
func: self.func.clone(),
sender: self.sender.clone(),
fut_storage: None,
_phantom_filter_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
pub(super) struct EventSinkSender<T, W> {
writer: W,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T, W> EventSinkSender<T, W> {
pub(super) fn new(writer: W) -> Self {
Self {
writer,
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, W> Sealed for EventSinkSender<T, W> {}
impl<T, W> InfallibleSender<T, ()> for EventSinkSender<T, W>
where
T: Clone + Send + 'static,
W: EventSinkWriter<T>,
{
fn send(&mut self, arg: &T) -> RecycledFuture<'_, Result<(), SendError>> {
InfallibleSender::send_owned(self, arg.clone())
}
fn send_owned(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
})
}
}
impl<T, W: Clone> Clone for EventSinkSender<T, W> {
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
pub(super) struct MapEventSinkSender<T, U, W, C>
where
C: Fn(&T) -> U,
{
writer: W,
map: Arc<C>,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
where
C: Fn(&T) -> U,
{
pub(super) fn new(map: C, writer: W) -> Self {
Self {
writer,
map: Arc::new(map),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, U, W, C: Fn(&T) -> U> Sealed for MapEventSinkSender<T, U, W, C> {}
impl<T, U, W, C> InfallibleSender<T, ()> for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(&T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: &T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
let arg = (self.map)(arg);
RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
})
}
}
impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
where
C: Fn(&T) -> U,
W: Clone,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
map: self.map.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
pub(super) struct FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(&T) -> Option<U>,
{
writer: W,
filter_map: Arc<C>,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T, U, W, C> FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(&T) -> Option<U>,
{
pub(super) fn new(filter_map: C, writer: W) -> Self {
Self {
writer,
filter_map: Arc::new(filter_map),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, U, W, C> Sender<T, ()> for FilterMapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(&T) -> Option<U> + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;
(self.filter_map)(arg).map(|arg| {
RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
})
})
}
}
impl<T, U, W, C> Clone for FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(&T) -> Option<U>,
W: Clone,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
filter_map: self.filter_map.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
pub(super) struct ReplierSender<M, F, T, R, S>
where
M: Model,
{
func: F,
sender: channel::Sender<M>,
receiver: multishot::Receiver<R>,
fut_storage: Option<RecycleBox<()>>,
_phantom_closure: PhantomData<fn(&mut M, T) -> R>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, F, T, R, S> ReplierSender<M, F, T, R, S>
where
M: Model,
{
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
Self {
func,
sender,
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M: Model, F, T, R, S> Sealed for ReplierSender<M, F, T, R, S> {}
impl<M, F, T, R, S> InfallibleSender<T, R> for ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
T: Clone + Send + 'static,
R: Send + 'static,
S: Send,
{
fn send(&mut self, arg: &T) -> RecycledFuture<'_, Result<R, SendError>> {
InfallibleSender::send_owned(self, arg.clone())
}
fn send_owned(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
let func = self.func.clone();
let sender = &mut self.sender;
let reply_receiver = &mut self.receiver;
let fut_storage = &mut self.fut_storage;
let reply_sender = reply_receiver.sender().unwrap();
let send_fut = sender.send(move |model, scheduler, env, recycle_box| {
let fut = async move {
let reply = func.call(model, arg, scheduler, env).await;
reply_sender.send(reply);
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(fut_storage, async move {
send_fut.await?;
reply_receiver.recv().await.map_err(|_| SendError)
})
}
}
impl<M, F, T, R, S> Clone for ReplierSender<M, F, T, R, S>
where
M: Model,
F: Clone,
{
fn clone(&self) -> Self {
Self {
func: self.func.clone(),
sender: self.sender.clone(),
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
pub(super) struct MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
{
query_map: Arc<C>,
reply_map: Arc<D>,
func: F,
sender: channel::Sender<M>,
receiver: multishot::Receiver<Q>,
fut_storage: Option<RecycleBox<()>>,
_phantom_query_map: PhantomData<fn(T) -> U>,
_phantom_reply_map: PhantomData<fn(Q) -> R>,
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, D, F, T, R, U, Q, S> MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
{
pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender<M>) -> Self {
Self {
query_map: Arc::new(query_map),
reply_map: Arc::new(reply_map),
func,
sender,
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_query_map: PhantomData,
_phantom_reply_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M: Model, C, D, F, T, R, U, Q, S> Sealed for MapReplierSender<M, C, D, F, T, R, U, Q, S> {}
impl<M, C, D, F, T, R, U, Q, S> InfallibleSender<T, R>
for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(&T) -> U + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: &T) -> RecycledFuture<'_, Result<R, SendError>> {
let func = self.func.clone();
let arg = (self.query_map)(arg);
let sender = &mut self.sender;
let reply_receiver = &mut self.receiver;
let fut_storage = &mut self.fut_storage;
let reply_map = &*self.reply_map;
let reply_sender = reply_receiver.sender().unwrap();
let send_fut = sender.send(move |model, scheduler, env, recycle_box| {
let fut = async move {
let reply = func.call(model, arg, scheduler, env).await;
reply_sender.send(reply);
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(fut_storage, async move {
send_fut.await?;
reply_receiver
.recv()
.await
.map_err(|_| SendError)
.map(reply_map)
})
}
}
impl<M, C, D, F, T, R, U, Q, S> Clone for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
F: Clone,
{
fn clone(&self) -> Self {
Self {
query_map: self.query_map.clone(),
reply_map: self.reply_map.clone(),
func: self.func.clone(),
sender: self.sender.clone(),
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_query_map: PhantomData,
_phantom_reply_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
pub(super) struct FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
{
query_filter_map: Arc<C>,
reply_map: Arc<D>,
func: F,
sender: channel::Sender<M>,
receiver: multishot::Receiver<Q>,
fut_storage: Option<RecycleBox<()>>,
_phantom_query_map: PhantomData<fn(T) -> U>,
_phantom_reply_map: PhantomData<fn(Q) -> R>,
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, D, F, T, R, U, Q, S> FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
{
pub(super) fn new(
query_filter_map: C,
reply_map: D,
func: F,
sender: channel::Sender<M>,
) -> Self {
Self {
query_filter_map: Arc::new(query_filter_map),
reply_map: Arc::new(reply_map),
func,
sender,
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_query_map: PhantomData,
_phantom_reply_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(&T) -> Option<U> + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
(self.query_filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = &mut self.sender;
let reply_receiver = &mut self.receiver;
let fut_storage = &mut self.fut_storage;
let reply_map = &*self.reply_map;
let reply_sender = reply_receiver.sender().unwrap();
let send_fut = sender.send(move |model, scheduler, env, recycle_box| {
let fut = async move {
let reply = func.call(model, arg, scheduler, env).await;
reply_sender.send(reply);
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(fut_storage, async move {
send_fut.await?;
reply_receiver
.recv()
.await
.map_err(|_| SendError)
.map(reply_map)
})
})
}
}
impl<M, C, D, F, T, R, U, Q, S> Clone for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
F: Clone,
{
fn clone(&self) -> Self {
Self {
query_filter_map: self.query_filter_map.clone(),
reply_map: self.reply_map.clone(),
func: self.func.clone(),
sender: self.sender.clone(),
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_query_map: PhantomData,
_phantom_reply_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
pub(super) struct RecycledFuture<'a, T> {
fut: ManuallyDrop<Pin<RecycleBox<dyn Future<Output = T> + Send + 'a>>>,
lender_box: &'a mut Option<RecycleBox<()>>,
}
impl<'a, T> RecycledFuture<'a, T> {
pub(super) fn new<F: Future<Output = T> + Send + 'a>(
lender_box: &'a mut Option<RecycleBox<()>>,
fut: F,
) -> Self {
let vacated_box = lender_box.take().unwrap_or_else(|| RecycleBox::new(()));
let fut: RecycleBox<dyn Future<Output = T> + Send + 'a> =
coerce_box!(RecycleBox::recycle(vacated_box, fut));
Self {
fut: ManuallyDrop::new(RecycleBox::into_pin(fut)),
lender_box,
}
}
}
impl<T> Drop for RecycledFuture<'_, T> {
fn drop(&mut self) {
*self.lender_box = Some(RecycleBox::vacate_pinned(unsafe {
ManuallyDrop::take(&mut self.fut)
}));
}
}
impl<T> Future for RecycledFuture<'_, T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut.as_mut().poll(cx)
}
}