mod broadcaster;
mod sender;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
use std::sync::Mutex;
use serde::{Deserialize, Serialize};
use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::model::Model;
use crate::ports::EventSinkWriter;
use crate::ports::{InputFn, ReplierFn};
use crate::simulation::Address;
use crate::util::cached_rw_lock::CachedRwLock;
use crate::util::traits::Sealed;
use crate::util::unwrap_or_throw::UnwrapOrThrow;
use broadcaster::{EventBroadcaster, QueryBroadcaster};
use sender::{FilterMapReplierSender, InfallibleSender};
use self::sender::{
EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
MapEventSinkSender, MapInputSender, MapReplierSender, ReplierSender,
};
scoped_thread_local!(pub(crate) static PORT_REG: PortsReg);
pub(crate) type PortsReg = Mutex<VecDeque<Box<dyn Any>>>;
#[derive(Clone)]
pub struct Output<T: Clone + Send + 'static> {
broadcaster: CachedRwLock<EventBroadcaster<T>>,
}
impl<T: Clone + Send + 'static> Output<T> {
pub fn new() -> Self {
Self::default()
}
pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>)
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
S: Send + 'static,
{
let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.write().unwrap().add(sender);
}
pub fn connect_sink<S: EventSinkWriter<T>>(&mut self, sink: S) {
let sender = Box::new(EventSinkSender::new(sink));
self.broadcaster.write().unwrap().add(sender)
}
pub fn map_connect<M, C, F, U, S>(&mut self, map: C, input: F, address: impl Into<Address<M>>)
where
M: Model,
C: Fn(&T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.write().unwrap().add(sender);
}
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: S)
where
C: Fn(&T) -> U + Send + Sync + 'static,
U: Send + 'static,
S: EventSinkWriter<U>,
{
let sender = Box::new(MapEventSinkSender::new(map, sink));
self.broadcaster.write().unwrap().add(sender);
}
pub fn filter_map_connect<M, C, F, U, S>(
&mut self,
filter_map: C,
input: F,
address: impl Into<Address<M>>,
) where
M: Model,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapInputSender::new(
filter_map,
input,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender);
}
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: S)
where
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
U: Send + 'static,
S: EventSinkWriter<U>,
{
let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink));
self.broadcaster.write().unwrap().add(sender);
}
pub async fn send(&mut self, arg: T) {
let broadcaster = self.broadcaster.write_scratchpad().unwrap();
broadcaster.broadcast(arg).await.unwrap_or_throw();
}
}
impl<T: Clone + Send + 'static> Default for Output<T> {
fn default() -> Self {
Self {
broadcaster: CachedRwLock::new(EventBroadcaster::default()),
}
}
}
impl<T: Clone + Send + 'static> fmt::Debug for Output<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Output ({} connected ports)",
self.broadcaster.read_unsync().len()
)
}
}
impl<T: Clone + Send> Serialize for Output<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
PORT_REG.map(|reg| reg.lock().unwrap().push_back(Box::new(self.clone())));
serializer.serialize_unit()
}
}
impl<'de, T: Clone + Send> Deserialize<'de> for Output<T> {
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(*PORT_REG
.map(|reg| reg.lock().unwrap().pop_front().unwrap().downcast().unwrap())
.unwrap_or_default())
}
}
pub struct Requestor<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: CachedRwLock<QueryBroadcaster<T, R>>,
}
impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
pub fn new() -> Self {
Self::default()
}
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>)
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
S: Send + 'static,
{
let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.write().unwrap().add(sender);
}
pub fn map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) where
M: Model,
C: Fn(&T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender);
}
pub fn filter_map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_filter_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) where
M: Model,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapReplierSender::new(
query_filter_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender);
}
pub async fn send(&mut self, arg: T) -> impl Iterator<Item = R> + '_ {
self.broadcaster
.write_scratchpad()
.unwrap()
.broadcast(arg)
.await
.unwrap_or_throw()
}
}
impl<T: Clone + Send, R: Send> Clone for Requestor<T, R> {
fn clone(&self) -> Self {
Self {
broadcaster: Clone::clone(&self.broadcaster),
}
}
}
impl<T: Clone + Send + 'static, R: Send + 'static> Default for Requestor<T, R> {
fn default() -> Self {
Self {
broadcaster: CachedRwLock::new(QueryBroadcaster::default()),
}
}
}
impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for Requestor<T, R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Requestor ({} connected ports)",
self.broadcaster.read_unsync().len()
)
}
}
impl<T: Clone + Send, R: Send> Serialize for Requestor<T, R> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
PORT_REG.map(|reg| reg.lock().unwrap().push_back(Box::new(self.clone())));
serializer.serialize_unit()
}
}
impl<'de, T: Clone + Send, R: Send> Deserialize<'de> for Requestor<T, R> {
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(*PORT_REG
.map(|reg| reg.lock().unwrap().pop_front().unwrap().downcast().unwrap())
.unwrap_or_default())
}
}
pub struct UniRequestor<T: Clone + Send + 'static, R: Send + 'static> {
sender: Box<dyn InfallibleSender<T, R>>,
}
impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
pub fn new<M, F, S>(replier: F, address: impl Into<Address<M>>) -> Self
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
S: Send + 'static,
{
let sender = Box::new(ReplierSender::new(replier, address.into().0));
Self { sender }
}
pub fn with_map<M, C, D, F, U, Q, S>(
query_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> Self
where
M: Model,
C: Fn(&T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
reply_map,
replier,
address.into().0,
));
Self { sender }
}
pub async fn send(&mut self, arg: T) -> R {
self.sender.send_owned(arg).await.unwrap_or_throw()
}
}
impl<T: Clone + Send, R: Send> Clone for UniRequestor<T, R> {
fn clone(&self) -> Self {
Self {
sender: Clone::clone(&self.sender),
}
}
}
impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for UniRequestor<T, R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "UniRequestor")
}
}
impl<T: Clone + Send, R: Send> Serialize for UniRequestor<T, R> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
PORT_REG.map(|reg| reg.lock().unwrap().push_back(Box::new(self.clone())));
serializer.serialize_unit()
}
}
impl<'de, T: Clone + Send, R: Send> Deserialize<'de> for UniRequestor<T, R> {
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(*PORT_REG
.map(|reg| reg.lock().unwrap().pop_front().unwrap().downcast().unwrap())
.unwrap_or(Box::new(UniRequestor {
sender: Box::new(FailSender),
})))
}
}
#[derive(Clone)]
struct FailSender;
impl Sealed for FailSender {}
impl<T, R> InfallibleSender<T, R> for FailSender {
fn send(&mut self, _: &T) -> sender::RecycledFuture<'_, Result<R, crate::channel::SendError>> {
panic!();
}
fn send_owned(
&mut self,
_: T,
) -> sender::RecycledFuture<'_, Result<R, crate::channel::SendError>> {
panic!();
}
}