use std::fmt;
use std::sync::{Arc, Mutex};
mod broadcaster;
mod sender;
use crate::model::{InputFn, Model, ReplierFn};
use crate::simulation::{Address, EventSlot, EventStream};
use crate::util::spsc_queue;
use broadcaster::Broadcaster;
use self::sender::{EventSender, EventSlotSender, EventStreamSender, QuerySender};
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct LineId(u64);
pub struct Output<T: Clone + Send + 'static> {
broadcaster: Broadcaster<T, ()>,
next_line_id: u64,
}
impl<T: Clone + Send + 'static> Output<T> {
pub fn new() -> Self {
Self::default()
}
pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>) -> LineId
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Copy,
S: Send + 'static,
{
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(EventSender::new(input, address.into().0));
self.broadcaster.add(sender, line_id);
line_id
}
pub fn connect_stream(&mut self) -> (EventStream<T>, LineId) {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let (producer, consumer) = spsc_queue::spsc_queue();
let sender = Box::new(EventStreamSender::new(producer));
let event_stream = EventStream::new(consumer);
self.broadcaster.add(sender, line_id);
(event_stream, line_id)
}
pub fn connect_slot(&mut self) -> (EventSlot<T>, LineId) {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let slot = Arc::new(Mutex::new(None));
let sender = Box::new(EventSlotSender::new(slot.clone()));
let event_slot = EventSlot::new(slot);
self.broadcaster.add(sender, line_id);
(event_slot, line_id)
}
pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> {
if self.broadcaster.remove(line_id) {
Ok(())
} else {
Err(LineError {})
}
}
pub fn disconnect_all(&mut self) {
self.broadcaster.clear();
}
pub async fn send(&mut self, arg: T) {
self.broadcaster.broadcast_event(arg).await.unwrap();
}
}
impl<T: Clone + Send + 'static> Default for Output<T> {
fn default() -> Self {
Self {
broadcaster: Broadcaster::default(),
next_line_id: 0,
}
}
}
impl<T: Clone + Send + 'static> fmt::Debug for Output<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Output ({} connected ports)", self.broadcaster.len())
}
}
pub struct Requestor<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: Broadcaster<T, R>,
next_line_id: u64,
}
impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
pub fn new() -> Self {
Self::default()
}
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Copy,
S: Send + 'static,
{
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(QuerySender::new(replier, address.into().0));
self.broadcaster.add(sender, line_id);
line_id
}
pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> {
if self.broadcaster.remove(line_id) {
Ok(())
} else {
Err(LineError {})
}
}
pub fn disconnect_all(&mut self) {
self.broadcaster.clear();
}
pub async fn send(&mut self, arg: T) -> impl Iterator<Item = R> + '_ {
self.broadcaster.broadcast_query(arg).await.unwrap()
}
}
impl<T: Clone + Send + 'static, R: Send + 'static> Default for Requestor<T, R> {
fn default() -> Self {
Self {
broadcaster: Broadcaster::default(),
next_line_id: 0,
}
}
}
impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for Requestor<T, R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Requestor ({} connected ports)", self.broadcaster.len())
}
}
#[derive(Copy, Clone, Debug)]
pub struct LineError {}