mod broadcaster;
mod sender;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use futures_channel::oneshot;
use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::model::{Message, Model};
use crate::path::Path;
use crate::ports::InputFn;
use crate::simulation::{
Address, DuplicateEventSourceError, DuplicateQuerySourceError, EventId, QueryId, SimInit,
};
use crate::util::unwrap_or_throw::UnwrapOrThrow;
pub(crate) use broadcaster::ReplyIterator;
use broadcaster::{EventBroadcaster, QueryBroadcaster};
use sender::{
FilterMapInputSender, FilterMapReplierSender, InputSender, MapInputSender, MapReplierSender,
ReplierSender,
};
use super::ReplierFn;
pub struct EventSource<T: Serialize + DeserializeOwned + Clone + Send + 'static> {
broadcaster: EventBroadcaster<T>,
}
impl<T: Serialize + DeserializeOwned + Clone + Send + 'static> EventSource<T> {
pub fn new() -> Self {
Self::default()
}
pub fn connect<M, F, S>(mut self, input: F, address: impl Into<Address<M>>) -> Self
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone + Sync,
S: Send + Sync + 'static,
{
let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.add(sender);
self
}
pub fn map_connect<M, C, F, U, S>(
mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> Self
where
M: Model,
C: for<'a> Fn(&'a T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Sync + Clone,
U: Send + 'static,
S: Send + Sync + 'static,
{
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.add(sender);
self
}
pub fn filter_map_connect<M, C, F, U, S>(
mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> Self
where
M: Model,
C: for<'a> Fn(&'a T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone + Sync,
U: Send + 'static,
S: Send + Sync + 'static,
{
let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0));
self.broadcaster.add(sender);
self
}
pub fn register(self, sim_init: &mut SimInit) -> EventId<T> {
sim_init.link_event_source(self)
}
pub(crate) fn event_future(&self, arg: T) -> impl Future<Output = ()> + use<T> {
let fut = self.broadcaster.broadcast(arg);
async {
fut.await.unwrap_or_throw();
}
}
}
impl<T: Message + Serialize + DeserializeOwned + Clone + Send + 'static> EventSource<T> {
pub fn bind_endpoint(
self,
sim_init: &mut SimInit,
path: impl Into<Path>,
) -> Result<(), DuplicateEventSourceError<T>> {
sim_init.bind_event_source(self, path.into())
}
}
impl<T: Serialize + DeserializeOwned + Clone + Send + 'static> EventSource<T> {
pub fn bind_endpoint_raw(
self,
sim_init: &mut SimInit,
path: impl Into<Path>,
) -> Result<(), DuplicateEventSourceError<T>> {
sim_init.bind_event_source_raw(self, path.into())
}
}
impl<T: Serialize + DeserializeOwned + Clone + Send + 'static> Default for EventSource<T> {
fn default() -> Self {
Self {
broadcaster: EventBroadcaster::default(),
}
}
}
impl<T: Serialize + DeserializeOwned + Clone + Send + 'static> fmt::Debug for EventSource<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Event source ({} connected ports)",
self.broadcaster.len()
)
}
}
pub struct QuerySource<T: Serialize + DeserializeOwned + Clone + Send + 'static, R: Send + 'static>
{
broadcaster: QueryBroadcaster<T, R>,
}
impl<T: Serialize + DeserializeOwned + Clone + Send + 'static, R: Send + 'static>
QuerySource<T, R>
{
pub fn new() -> Self {
Self::default()
}
pub fn connect<M, F, S>(mut self, replier: F, address: impl Into<Address<M>>) -> Self
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone + Sync,
S: Send + Sync + 'static,
{
let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.add(sender);
self
}
pub fn map_connect<M, C, D, F, U, Q, S>(
mut self,
query_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> Self
where
M: Model,
C: for<'a> Fn(&'a T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
U: Send + 'static,
Q: Send + 'static,
S: Send + Sync + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.add(sender);
self
}
pub fn filter_map_connect<M, C, D, F, U, Q, S>(
mut self,
query_filter_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> Self
where
M: Model,
C: for<'a> Fn(&'a T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
U: Send + 'static,
Q: Send + 'static,
S: Send + Sync + 'static,
{
let sender = Box::new(FilterMapReplierSender::new(
query_filter_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.add(sender);
self
}
pub fn register(self, sim_init: &mut SimInit) -> QueryId<T, R> {
sim_init.link_query_source(self)
}
pub(crate) fn query_future(
&self,
arg: T,
replier: Option<ReplyWriter<R>>,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let fut = self.broadcaster.broadcast(arg);
let fut = async move {
let replies = fut.await.unwrap_or_throw();
if let Some(replier) = replier {
replier.send(replies);
}
};
Box::pin(fut)
}
}
impl<
T: Message + Serialize + DeserializeOwned + Clone + Send + 'static,
R: Message + Serialize + Send + 'static,
> QuerySource<T, R>
{
pub fn bind_endpoint(
self,
sim_init: &mut SimInit,
path: impl Into<Path>,
) -> Result<(), DuplicateQuerySourceError<Self>> {
sim_init.bind_query_source(self, path.into())
}
}
impl<T: Serialize + DeserializeOwned + Clone + Send + 'static, R: Serialize + Send + 'static>
QuerySource<T, R>
{
pub fn bind_endpoint_raw(
self,
path: impl Into<Path>,
sim_init: &mut SimInit,
) -> Result<(), DuplicateQuerySourceError<Self>> {
sim_init.bind_query_source_raw(self, path.into())
}
}
impl<T: Serialize + DeserializeOwned + Clone + Send + 'static, R: Send + 'static> Default
for QuerySource<T, R>
{
fn default() -> Self {
Self {
broadcaster: QueryBroadcaster::default(),
}
}
}
impl<T: Serialize + DeserializeOwned + Clone + Send + 'static, R: Send + 'static> fmt::Debug
for QuerySource<T, R>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Query source ({} connected ports)",
self.broadcaster.len()
)
}
}
#[derive(Debug)]
pub struct ReplyReader<R>(oneshot::Receiver<ReplyIterator<R>>);
impl<R: Send + 'static> ReplyReader<R> {
pub fn try_read(&mut self) -> Option<impl Iterator<Item = R>> {
self.0.try_recv().ok()?
}
pub fn read(self) -> Option<impl Iterator<Item = R>> {
pollster::block_on(self)
}
}
impl<R: Send + 'static> Future for ReplyReader<R> {
type Output = Option<ReplyIterator<R>>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
Pin::new(&mut self.get_mut().0).poll(cx).map(Result::ok)
}
}
pub(crate) struct ReplyWriter<R>(oneshot::Sender<ReplyIterator<R>>);
impl<R: Send + 'static> ReplyWriter<R> {
pub(crate) fn send(self, reply: ReplyIterator<R>) {
let _ = self.0.send(reply);
}
}
pub(crate) fn query_replier<R: Send + 'static>() -> (ReplyWriter<R>, ReplyReader<R>) {
let (tx, rx) = oneshot::channel();
(ReplyWriter(tx), ReplyReader(rx))
}