use core::fmt;
use std::{
collections::HashMap,
fmt::Formatter,
sync::{
mpsc::{channel, Receiver, Sender},
Arc,
},
};
use async_trait::async_trait;
use futures::{channel::mpsc, lock::Mutex, SinkExt};
use thiserror::Error;
use crate::time::wait_ms;
#[derive(Debug, Error)]
pub enum BrokerError {
#[error("While sending to tap")]
SendQueue,
#[error("While decoding BrokerMessage")]
BMDecode,
#[error("Internal structure is locked")]
Locked,
#[error(transparent)]
SendMPSC(#[from] mpsc::SendError),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Destination {
All,
Others,
This,
NoTap,
}
enum SubsystemAction<T> {
Add(usize, Subsystem<T>),
Remove(usize),
}
impl<T> fmt::Debug for SubsystemAction<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
SubsystemAction::Add(id, ss) => write!(f, "Add({}, {ss:?})", id),
SubsystemAction::Remove(id) => write!(f, "Remove({})", id),
}
}
}
#[cfg(feature = "wasm")]
mod asy {
pub trait Async {}
impl<T> Async for T {}
}
#[cfg(not(feature = "wasm"))]
mod asy {
pub trait Async: Sync + Send {}
impl<T: Sync + Send> Async for T {}
}
pub use asy::*;
pub struct Broker<T: Async + Clone + fmt::Debug> {
intern: Arc<Mutex<Intern<T>>>,
intern_tx: mpsc::UnboundedSender<(Destination, T)>,
subsystem_tx: mpsc::UnboundedSender<SubsystemAction<T>>,
subsystems: Arc<Mutex<usize>>,
}
impl<T: 'static + Async + Clone + fmt::Debug> Default for Broker<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Async + Clone + fmt::Debug> Clone for Broker<T> {
fn clone(&self) -> Self {
Self {
intern: Arc::clone(&self.intern),
intern_tx: self.intern_tx.clone(),
subsystem_tx: self.subsystem_tx.clone(),
subsystems: Arc::clone(&self.subsystems),
}
}
}
impl<T: 'static + Async + Clone + fmt::Debug> Broker<T> {
pub fn new() -> Self {
let (subsystem_tx, subsystem_rx) = mpsc::unbounded();
let intern = Intern::new(subsystem_rx);
Self {
intern_tx: intern.clone_tx(),
intern: Arc::new(Mutex::new(intern)),
subsystem_tx,
subsystems: Arc::new(Mutex::new(0)),
}
}
pub async fn add_subsystem(&mut self, ss: Subsystem<T>) -> Result<usize, BrokerError> {
let subsystem = {
let mut subsystems = self.subsystems.lock().await;
*subsystems += 1;
*subsystems
};
self.subsystem_tx
.send(SubsystemAction::Add(subsystem, ss))
.await?;
Ok(subsystem)
}
pub async fn remove_subsystem(&mut self, ss: usize) -> Result<(), BrokerError> {
self.subsystem_tx.send(SubsystemAction::Remove(ss)).await?;
self.process_retry(10).await?;
Ok(())
}
pub async fn get_tap(&mut self) -> Result<(Receiver<T>, usize), BrokerError> {
let (tx, rx) = channel();
let pos = self.add_subsystem(Subsystem::Tap(tx)).await?;
Ok((rx, pos))
}
pub async fn process_retry(&mut self, mut retries: i32) -> Result<usize, BrokerError> {
let mut intern = self.intern.try_lock();
while intern.is_none() && retries != 0 {
log::trace!("Couldn't lock intern - trying again in 100ms");
wait_ms(100).await;
intern = self.intern.try_lock();
if retries > 0 {
retries -= 1;
if retries == 0 {
log::warn!("Couldn't lock intern - stop trying");
}
}
}
intern.ok_or(BrokerError::Locked)?.process().await
}
pub async fn process(&mut self) -> Result<usize, BrokerError> {
self.process_retry(0).await
}
pub async fn enqueue_msg_dest(&mut self, dst: Destination, msg: T) -> Result<(), BrokerError> {
self.intern_tx.send((dst, msg)).await?;
Ok(())
}
pub async fn emit_msg_dest(&mut self, dst: Destination, msg: T) -> Result<usize, BrokerError> {
self.intern_tx.send((dst, msg)).await?;
self.process_retry(10).await
}
pub async fn enqueue_msg(&mut self, msg: T) -> Result<(), BrokerError> {
self.enqueue_msg_dest(Destination::Others, msg).await
}
pub async fn emit_msg(&mut self, msg: T) -> Result<usize, BrokerError> {
self.emit_msg_dest(Destination::Others, msg).await
}
pub async fn link_bi<R: 'static + Async + Clone + fmt::Debug>(
&mut self,
mut broker: Broker<R>,
link_rt: Translate<R, T>,
link_tr: Translate<T, R>,
) {
self.forward(broker.clone(), link_tr).await;
broker.forward(self.clone(), link_rt).await;
}
pub async fn forward<R: 'static + Async + Clone + fmt::Debug>(
&mut self,
broker: Broker<R>,
link_tr: Translate<T, R>,
) {
let translator_tr = Translator {
broker,
translate: link_tr,
};
self.add_subsystem(Subsystem::Translator(Box::new(translator_tr)))
.await
.unwrap();
}
#[cfg(feature = "wasm")]
pub fn emit_msg_wasm(&self, msg: T) {
let mut broker = self.clone();
wasm_bindgen_futures::spawn_local(async move {
broker
.emit_msg(msg)
.await
.err()
.map(|e| log::error!("{:?}", e));
});
}
}
#[cfg(not(feature = "wasm"))]
pub type Translate<R, T> = Box<dyn Fn(R) -> Option<T> + Send + 'static>;
#[cfg(feature = "wasm")]
pub type Translate<R, T> = Box<dyn Fn(R) -> Option<T> + 'static>;
struct Translator<R: Clone, S: Async + Clone + fmt::Debug> {
broker: Broker<S>,
translate: Translate<R, S>,
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl<R: Async + Clone + fmt::Debug, S: 'static + Async + Clone + fmt::Debug> SubsystemTranslator<R>
for Translator<R, S>
{
async fn translate(&mut self, msg: R) -> bool {
let mut translated = false;
let msg_res = (self.translate)(msg.clone());
if let Some(msg_tr) = msg_res {
log::trace!(
"Translated {} -> {}: {msg_tr:?}",
std::any::type_name::<R>(),
std::any::type_name::<S>()
);
translated = true;
if let Err(e) = self.broker.enqueue_msg(msg_tr).await {
log::error!("While sending message: {e}");
}
}
if translated {
if let Err(e) = self.broker.process().await {
log::trace!(
"Translated message {} queued, but not processed: {e}",
std::any::type_name::<S>()
);
}
}
translated
}
}
struct Intern<T: Async + Clone + fmt::Debug> {
main_tx: mpsc::UnboundedSender<(Destination, T)>,
subsystems: HashMap<usize, Subsystem<T>>,
msg_queue: HashMap<usize, Vec<(Destination, T)>>,
subsystem_rx: mpsc::UnboundedReceiver<SubsystemAction<T>>,
}
impl<T: Async + Clone + fmt::Debug> Intern<T> {
fn new(subsystem_rx: mpsc::UnboundedReceiver<SubsystemAction<T>>) -> Self {
let (main_tx, main_rx) = mpsc::unbounded::<(Destination, T)>();
let mut subsystems = HashMap::new();
subsystems.insert(0, Subsystem::ChannelFuture(main_rx));
let mut msg_queue = HashMap::new();
msg_queue.insert(0, Vec::new());
Self {
main_tx,
subsystems,
msg_queue,
subsystem_rx,
}
}
fn clone_tx(&self) -> mpsc::UnboundedSender<(Destination, T)> {
self.main_tx.clone()
}
fn subsystem_action(&mut self, ssa: SubsystemAction<T>) {
match ssa {
SubsystemAction::Add(pos, s) => {
self.subsystems.insert(pos, s);
self.msg_queue.insert(pos, vec![]);
}
SubsystemAction::Remove(pos) => {
self.subsystems.remove(&pos);
self.msg_queue.remove(&pos);
}
}
}
async fn process(&mut self) -> Result<usize, BrokerError> {
let mut msg_count: usize = 0;
loop {
let msgs = self.process_once().await?;
msg_count += msgs;
if msgs == 0 && self.msg_queue_len() == 0 {
break;
}
}
Ok(msg_count)
}
async fn process_once(&mut self) -> Result<usize, BrokerError> {
while let Ok(Some(ss)) = self.subsystem_rx.try_next() {
log::trace!("{self:p} subsystem action {ss:?}");
self.subsystem_action(ss);
}
let (msg_count, new_msgs) = self.process_get_new_messages().await?;
let new_msgs = self.process_translate_messages(new_msgs).await?;
let subsystems_remove = self.process_handle_messages(new_msgs).await;
for index in subsystems_remove.iter().rev() {
self.subsystem_action(SubsystemAction::Remove(*index));
}
Ok(msg_count)
}
async fn process_get_new_messages(
&mut self,
) -> Result<(usize, HashMap<usize, Vec<(Destination, T)>>), BrokerError> {
let mut msg_count: usize = 0;
let mut new_msgs = HashMap::new();
for (index, ss) in self.subsystems.iter_mut() {
let mut msgs: Vec<(Destination, T)> = self
.msg_queue
.get_mut(index)
.expect("Get msg_queue")
.drain(..)
.collect();
msgs.append(&mut ss.get_messages().await);
msg_count += msgs.len();
new_msgs.insert(*index, msgs);
}
self.msg_queue.clear();
Ok((msg_count, new_msgs))
}
async fn process_translate_messages(
&mut self,
mut new_msgs: HashMap<usize, Vec<(Destination, T)>>,
) -> Result<HashMap<usize, Vec<(Destination, T)>>, BrokerError> {
for (_, ss) in self.subsystems.iter_mut() {
if let Subsystem::Translator(ref mut translator) = ss {
for (_, nms) in new_msgs.iter_mut() {
let mut i = 0;
while i < nms.len() {
if translator.translate(nms[i].1.clone()).await {
nms.remove(i);
} else {
i += 1;
}
}
}
}
}
Ok(new_msgs)
}
async fn process_handle_messages(
&mut self,
new_msgs: HashMap<usize, Vec<(Destination, T)>>,
) -> Vec<usize> {
let mut ss_remove = vec![];
for (index, ss) in self.subsystems.iter_mut() {
let mut msg_queue = vec![];
if !ss.is_translator() {
for (index_nm, nms) in new_msgs.iter() {
if nms.len() == 0 {
continue;
}
let msgs = nms
.iter()
.filter(|nm| match nm.0 {
Destination::All => true,
Destination::Others => index_nm != index,
Destination::This => index_nm == index,
Destination::NoTap => !ss.is_tap(),
})
.map(|nm| &nm.1)
.cloned()
.collect();
match ss.put_messages(msgs).await {
Ok(mut msgs_new) => msg_queue.append(&mut msgs_new),
Err(e) => {
ss_remove.push(*index);
log::error!("While sending messages: {}", e);
}
}
}
}
self.msg_queue.insert(*index, msg_queue);
}
ss_remove
}
fn msg_queue_len(&self) -> usize {
self.msg_queue.iter().fold(0, |l, (_, q)| l + q.len())
}
}
#[cfg(feature = "wasm")]
pub enum Subsystem<T> {
ChannelFuture(mpsc::UnboundedReceiver<(Destination, T)>),
Channel(Receiver<(Destination, T)>),
Tap(Sender<T>),
Handler(Box<dyn SubsystemListener<T>>),
Translator(Box<dyn SubsystemTranslator<T>>),
}
#[cfg(not(feature = "wasm"))]
pub enum Subsystem<T> {
ChannelFuture(mpsc::UnboundedReceiver<(Destination, T)>),
Channel(Receiver<(Destination, T)>),
Tap(Sender<T>),
Handler(Box<dyn SubsystemListener<T> + Send>),
Translator(Box<dyn SubsystemTranslator<T> + Send>),
}
impl<T: Async + Clone + fmt::Debug> Subsystem<T> {
async fn get_messages(&mut self) -> Vec<(Destination, T)> {
match self {
Self::Channel(s) => {
let msgs = s.try_iter().collect();
msgs
}
Self::ChannelFuture(s) => {
let mut msgs = vec![];
while let Ok(Some(msg)) = s.try_next() {
msgs.push(msg);
}
msgs
}
_ => vec![],
}
}
async fn put_messages(&mut self, msgs: Vec<T>) -> Result<Vec<(Destination, T)>, BrokerError> {
Ok(match self {
Self::Tap(s) => {
for msg in msgs {
s.send(msg.clone()).map_err(|_| BrokerError::SendQueue)?;
}
vec![]
}
Self::Handler(h) => h.messages(msgs).await,
_ => vec![],
})
}
fn is_tap(&self) -> bool {
matches!(self, Self::Tap(_))
}
fn is_translator(&self) -> bool {
matches!(self, Self::Translator(_))
}
}
impl<T> fmt::Debug for Subsystem<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Channel(_) => write!(f, "Channel"),
Self::ChannelFuture(_) => write!(f, "ChannelFuture"),
Self::Tap(_) => write!(f, "Tap"),
Self::Handler(_) => write!(f, "Handler"),
Self::Translator(_) => write!(f, "Translator"),
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait SubsystemListener<T: Async> {
async fn messages(&mut self, from_broker: Vec<T>) -> Vec<(Destination, T)>;
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait SubsystemTranslator<T: Async> {
async fn translate(&mut self, from_broker: T) -> bool;
}
#[cfg(test)]
mod tests {
use crate::start_logging;
use super::*;
#[derive(Debug, Clone, PartialEq)]
pub enum BrokerTest {
MsgA,
MsgB,
MsgC,
MsgD,
}
pub struct Tps {
reply: Vec<(BrokerTest, BrokerTest)>,
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl SubsystemListener<BrokerTest> for Tps {
async fn messages(&mut self, msgs: Vec<BrokerTest>) -> Vec<(Destination, BrokerTest)> {
let mut output = vec![];
log::debug!("Msgs are: {:?} - Replies are: {:?}", msgs, self.reply);
for msg in msgs {
if let Some(bm) = self.reply.iter().find(|repl| repl.0 == msg) {
log::debug!("Found message");
output.push((Destination::Others, bm.1.clone()));
}
}
output
}
}
#[tokio::test]
async fn test_broker_new() -> Result<(), BrokerError> {
start_logging();
let bm_a = BrokerTest::MsgA;
let bm_b = BrokerTest::MsgB;
let broker = &mut Broker::new();
broker
.add_subsystem(Subsystem::Handler(Box::new(Tps {
reply: vec![(bm_a.clone(), bm_b.clone())],
})))
.await?;
let (tap_tx, tap) = channel::<BrokerTest>();
broker.add_subsystem(Subsystem::Tap(tap_tx)).await?;
broker.emit_msg(bm_b.clone()).await?;
assert_eq!(tap.try_iter().count(), 1);
broker.emit_msg(bm_a.clone()).await?;
broker.process().await?;
assert_eq!(tap.try_iter().count(), 2);
broker
.add_subsystem(Subsystem::Handler(Box::new(Tps {
reply: vec![(bm_a.clone(), bm_b)],
})))
.await?;
broker.emit_msg(bm_a).await?;
broker.process().await?;
assert_eq!(tap.try_iter().count(), 3);
Ok(())
}
struct DestinationTest {
reply_tx: Sender<BrokerTest>,
listen: BrokerTest,
dest: Destination,
}
impl DestinationTest {
async fn start(
b: &mut Broker<BrokerTest>,
listen: BrokerTest,
dest: Destination,
) -> Result<Receiver<BrokerTest>, BrokerError> {
let (reply_tx, reply_rx) = channel::<BrokerTest>();
b.add_subsystem(Subsystem::Handler(Box::new(Self {
reply_tx,
listen,
dest,
})))
.await?;
Ok(reply_rx)
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl SubsystemListener<BrokerTest> for DestinationTest {
async fn messages(&mut self, msgs: Vec<BrokerTest>) -> Vec<(Destination, BrokerTest)> {
for msg in msgs {
if msg == self.listen {
return vec![(self.dest, BrokerTest::MsgA)];
}
if matches!(msg, BrokerTest::MsgA) {
if let Err(e) = self.reply_tx.send(msg.clone()) {
log::error!("While sending: {e}");
}
}
}
vec![]
}
}
fn received(wanted: &[bool], rxs: &[&Receiver<BrokerTest>]) -> bool {
if wanted.len() != rxs.len() {
log::error!("Not same length");
return false;
}
let mut effective = vec![];
for rx in rxs {
effective.push(rx.try_iter().count() > 0);
}
if effective == wanted {
return true;
} else {
log::error!("Wanted: {wanted:?} - Got: {effective:?}");
return false;
}
}
#[tokio::test]
async fn test_destination() -> Result<(), BrokerError> {
start_logging();
let broker = &mut Broker::<BrokerTest>::new();
let t1 = &DestinationTest::start(broker, BrokerTest::MsgB, Destination::All).await?;
let t2 = &DestinationTest::start(broker, BrokerTest::MsgC, Destination::Others).await?;
let t3 = &DestinationTest::start(broker, BrokerTest::MsgD, Destination::This).await?;
broker.emit_msg(BrokerTest::MsgB).await?;
broker.process().await?;
assert!(received(&[true, true, true], &[t1, t2, t3]));
broker.emit_msg(BrokerTest::MsgC).await?;
broker.process().await?;
assert!(received(&[true, false, true], &[t1, t2, t3]));
broker.emit_msg(BrokerTest::MsgD).await?;
broker.process().await?;
assert!(received(&[false, false, true], &[t1, t2, t3]));
Ok(())
}
#[derive(Clone, PartialEq, Debug)]
enum MessageA {
One,
Two,
Four,
}
fn translate_ab(msg: MessageA) -> Option<MessageB> {
match msg {
MessageA::Two => Some(MessageB::Deux),
_ => None,
}
}
fn translate_ba(msg: MessageB) -> Option<MessageA> {
match msg {
MessageB::Un => Some(MessageA::One),
_ => None,
}
}
#[derive(Clone, PartialEq, Debug)]
enum MessageB {
Un,
Deux,
Trois,
}
#[derive(Error, Debug)]
enum ConvertError {
#[error("Wrong conversion")]
Conversion(String),
#[error(transparent)]
Broker(#[from] BrokerError),
}
#[tokio::test]
async fn link() -> Result<(), ConvertError> {
start_logging();
let mut broker_a: Broker<MessageA> = Broker::new();
let (tap_a_tx, tap_a_rx) = channel::<MessageA>();
broker_a.add_subsystem(Subsystem::Tap(tap_a_tx)).await?;
let mut broker_b: Broker<MessageB> = Broker::new();
let (tap_b_tx, tap_b_rx) = channel::<MessageB>();
broker_b.add_subsystem(Subsystem::Tap(tap_b_tx)).await?;
broker_b
.link_bi(
broker_a.clone(),
Box::new(translate_ab),
Box::new(translate_ba),
)
.await;
broker_a.emit_msg(MessageA::Two).await?;
if let Ok(msg) = tap_b_rx.try_recv() {
assert_eq!(MessageB::Deux, msg);
} else {
return Err(ConvertError::Conversion("A to B".to_string()));
}
broker_b.emit_msg(MessageB::Un).await?;
if let Ok(msg) = tap_a_rx.try_recv() {
assert_eq!(MessageA::One, msg);
} else {
return Err(ConvertError::Conversion("B to A".to_string()));
}
broker_a.emit_msg(MessageA::Four).await?;
tap_a_rx.recv().unwrap();
assert!(tap_b_rx.try_recv().is_err());
broker_b.emit_msg(MessageB::Trois).await?;
tap_b_rx.recv().unwrap();
assert!(tap_a_rx.try_recv().is_err());
Ok(())
}
}