use core::fmt;
use std::{
collections::HashMap,
fmt::Formatter,
sync::{
mpsc::{channel, Receiver, Sender},
Arc,
},
};
use async_trait::async_trait;
use futures::{future::BoxFuture, lock::Mutex};
use thiserror::Error;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use flarch::spawn_local;
#[derive(Debug, Error)]
pub enum BrokerError {
#[error("While sending to {0}")]
SendQueue(String),
}
pub type BrokerID = U256;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Destination {
All,
NoTap,
Forwarded(Vec<BrokerID>),
Handled(usize),
}
enum SubsystemAction<T> {
Add(usize, Subsystem<T>),
Remove(usize),
}
impl<T> fmt::Debug for SubsystemAction<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
SubsystemAction::Add(id, ss) => write!(f, "Add({}, {ss:?})", id),
SubsystemAction::Remove(id) => write!(f, "Remove({})", id),
}
}
}
#[cfg(feature = "nosend")]
mod asy {
pub trait Async {}
impl<T> Async for T {}
}
#[cfg(not(feature = "nosend"))]
mod asy {
pub trait Async: Sync + Send {}
impl<T: Sync + Send> Async for T {}
}
use asy::*;
use crate::nodeids::U256;
pub struct Broker<T: Async + Clone + fmt::Debug> {
intern_tx: UnboundedSender<InternMessage<T>>,
subsystems: Arc<Mutex<usize>>,
id: BrokerID,
}
impl<T: 'static + Async + Clone + fmt::Debug> Default for Broker<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Async + Clone + fmt::Debug> Clone for Broker<T> {
fn clone(&self) -> Self {
Self {
intern_tx: self.intern_tx.clone(),
subsystems: Arc::clone(&self.subsystems),
id: self.id,
}
}
}
impl<T: 'static + Async + Clone + fmt::Debug> Broker<T> {
pub fn new() -> Self {
let id = BrokerID::rnd();
let intern_tx = Intern::new(id);
Self {
intern_tx,
subsystems: Arc::new(Mutex::new(0)),
id,
}
}
pub async fn add_subsystem(&mut self, ss: Subsystem<T>) -> Result<usize, BrokerError> {
let subsystem = {
let mut subsystems = self.subsystems.lock().await;
*subsystems += 1;
*subsystems
};
self.intern_tx
.send(InternMessage::Subsystem(SubsystemAction::Add(
subsystem, ss,
)))
.map_err(|_| BrokerError::SendQueue("add_subsystem".into()))?;
Ok(subsystem)
}
pub async fn remove_subsystem(&mut self, ss: usize) -> Result<(), BrokerError> {
self.intern_tx
.send(InternMessage::Subsystem(SubsystemAction::Remove(ss)))
.map_err(|_| BrokerError::SendQueue("remove_subsystem".into()))?;
self.settle(vec![]).await
}
pub async fn get_tap(&mut self) -> Result<(UnboundedReceiver<T>, usize), BrokerError> {
let (tx, rx) = unbounded_channel();
let pos = self.add_subsystem(Subsystem::Tap(tx)).await?;
Ok((rx, pos))
}
pub async fn get_tap_sync(&mut self) -> Result<(Receiver<T>, usize), BrokerError> {
let (tx, rx) = channel();
let pos = self.add_subsystem(Subsystem::TapSync(tx)).await?;
Ok((rx, pos))
}
pub fn emit_msg_dest(&mut self, dst: Destination, msg: T) -> Result<(), BrokerError> {
self.intern_tx
.send(InternMessage::Message(dst, msg))
.map_err(|_| BrokerError::SendQueue("emit_msg_dest".into()))
}
pub fn emit_msg(&mut self, msg: T) -> Result<(), BrokerError> {
self.emit_msg_dest(Destination::All, msg)
}
pub async fn settle_msg_dest(&mut self, dst: Destination, msg: T) -> Result<(), BrokerError> {
self.intern_tx
.send(InternMessage::Message(dst, msg))
.map_err(|_| BrokerError::SendQueue("settle_msg_dest".into()))?;
self.settle(vec![]).await
}
pub async fn settle_msg(&mut self, msg: T) -> Result<(), BrokerError> {
self.settle_msg_dest(Destination::All, msg).await
}
pub async fn link_bi<R: 'static + Async + Clone + fmt::Debug>(
&mut self,
mut broker: Broker<R>,
link_rt: Translate<R, T>,
link_tr: Translate<T, R>,
) -> Result<(), BrokerError> {
self.forward(broker.clone(), link_tr).await;
broker.forward(self.clone(), link_rt).await;
Ok(())
}
pub async fn forward<R: 'static + Async + Clone + fmt::Debug>(
&mut self,
broker: Broker<R>,
link_tr: Translate<T, R>,
) {
let translator_tr = Translator {
broker,
translate: link_tr,
};
self.add_subsystem(Subsystem::Translator(Box::new(translator_tr)))
.await
.unwrap();
}
async fn settle(&mut self, callers: Vec<BrokerID>) -> Result<(), BrokerError> {
let (tx, mut rx) = unbounded_channel();
self.intern_tx
.send(InternMessage::Settle(callers.clone(), tx))
.map_err(|_| BrokerError::SendQueue("settle".into()))?;
rx.recv().await;
Ok(())
}
}
#[cfg(not(feature = "nosend"))]
pub type Translate<R, T> = Box<dyn Fn(R) -> Option<T> + Send + 'static>;
#[cfg(feature = "nosend")]
pub type Translate<R, T> = Box<dyn Fn(R) -> Option<T> + 'static>;
struct Translator<R: Clone, S: Async + Clone + fmt::Debug> {
broker: Broker<S>,
translate: Translate<R, S>,
}
#[cfg_attr(feature = "nosend", async_trait(?Send))]
#[cfg_attr(not(feature = "nosend"), async_trait)]
impl<R: Async + Clone + fmt::Debug, S: 'static + Async + Clone + fmt::Debug> SubsystemTranslator<R>
for Translator<R, S>
{
async fn translate(&mut self, trail: Vec<BrokerID>, msg: R) -> bool {
let msg_res = (self.translate)(msg.clone());
if let Some(msg_tr) = msg_res {
log::trace!(
"Translated {} -> {}: {msg_tr:?}, sending to {}",
std::any::type_name::<R>(),
std::any::type_name::<S>(),
self.broker.id,
);
self.broker
.emit_msg_dest(Destination::Forwarded(trail), msg_tr)
.err()
.map(|e| {
log::error!(
"{:p}: Translated message {} -> {} couldn't be queued: {e}",
self,
std::any::type_name::<R>(),
std::any::type_name::<S>()
);
});
return true;
}
false
}
async fn settle(&mut self, callers: Vec<BrokerID>) -> Result<(), BrokerError> {
if !callers.contains(&self.broker.id) {
self.broker.settle(callers).await?;
}
Ok(())
}
}
enum InternMessage<T: Async + Clone + fmt::Debug> {
Subsystem(SubsystemAction<T>),
Message(Destination, T),
Settle(Vec<BrokerID>, UnboundedSender<bool>),
}
struct Intern<T: Async + Clone + fmt::Debug> {
main_rx: UnboundedReceiver<InternMessage<T>>,
subsystems: HashMap<usize, Subsystem<T>>,
msg_queue: Vec<(Destination, T)>,
id: BrokerID,
}
impl<T: Async + Clone + fmt::Debug + 'static> Intern<T> {
fn new(id: BrokerID) -> UnboundedSender<InternMessage<T>> {
log::trace!("Creating Broker {} for {}", id, std::any::type_name::<T>());
let (main_tx, main_rx) = unbounded_channel::<InternMessage<T>>();
spawn_local(async move {
let mut intern = Self {
main_rx,
subsystems: HashMap::new(),
msg_queue: vec![],
id,
};
loop {
if !intern.get_msg().await {
log::warn!(
"{}: Closed Intern.main_rx for {}",
intern.id,
intern.type_id()
);
return;
}
if intern.msg_queue.len() > 0 {
match intern.process().await {
Ok(nbr) => log::trace!("{}: Processed {nbr} messages", intern.type_id()),
Err(e) => {
log::error!("{}: Couldn't process: {e:?}", intern.type_id());
}
}
}
}
});
main_tx
}
async fn process(&mut self) -> Result<usize, BrokerError> {
let mut msg_count: usize = 0;
loop {
self.process_once().await?;
if self.msg_queue.len() == 0 {
break;
}
msg_count += self.msg_queue.len();
}
Ok(msg_count)
}
async fn get_msg(&mut self) -> bool {
let msg_queue = match self.main_rx.recv().await {
Some(msg) => msg,
None => {
return false;
}
};
let msg = match msg_queue {
InternMessage::Subsystem(ss) => {
log::trace!("{self:p}/{} subsystem action {ss:?}", self.type_id());
self.subsystem_action(ss);
return true;
}
InternMessage::Message(dst, msg) => (dst, msg),
InternMessage::Settle(list, reply) => {
let type_id = self.type_id();
if !list.contains(&self.id) {
let mut list = list.clone();
list.push(self.id);
for (_, ss) in self.subsystems.iter_mut() {
ss.settle(list.clone())
.await
.err()
.map(|e| log::error!("{}: While settling: {e:?}", type_id));
}
}
reply
.send(true)
.err()
.map(|e| log::error!("{}: Couldn't send: {e:?}", type_id));
return true;
}
};
self.msg_queue.push(msg);
true
}
fn subsystem_action(&mut self, ssa: SubsystemAction<T>) {
match ssa {
SubsystemAction::Add(pos, s) => {
self.subsystems.insert(pos, s);
}
SubsystemAction::Remove(pos) => {
self.subsystems.remove(&pos);
}
}
}
async fn process_once(&mut self) -> Result<(), BrokerError> {
self.process_translate_messages().await;
let mut subsystems_error = self.send_tap().await;
subsystems_error.extend(self.process_handle_messages().await);
for index in subsystems_error.iter().rev() {
self.subsystem_action(SubsystemAction::Remove(*index));
}
Ok(())
}
async fn process_translate_messages(&mut self) {
let mut i = 0;
while i < self.msg_queue.len() {
let (dst, msg) = &mut self.msg_queue[i];
let mut trail = vec![];
if let Destination::Forwarded(t) = dst.clone() {
trail.extend(t);
}
if trail.contains(&self.id) {
log::trace!(
"{}: Endless forward-loop detected, aborting",
self.type_id()
);
i += 1;
continue;
}
trail.push(self.id);
let mut translated = false;
for (_, ss) in self
.subsystems
.iter_mut()
.filter(|(_, ss)| ss.is_translator())
{
if match ss {
Subsystem::Translator(ref mut translator) => {
translator.translate(trail.clone(), msg.clone()).await
}
Subsystem::TranslatorCallback(translator) => {
(translator)(trail.clone(), msg.clone()).await
}
_ => false,
} {
translated = true;
}
}
if translated {
self.msg_queue.remove(i);
} else {
i += 1;
}
}
}
async fn process_handle_messages(&mut self) -> Vec<usize> {
if self.msg_queue.len() == 0 {
return vec![];
}
let mut ss_remove = vec![];
let mut new_msg_queue = vec![];
let type_id = self.type_id();
for (index_ss, ss) in self.subsystems.iter_mut().filter(|(_, ss)| ss.is_handler()) {
let msgs: Vec<T> = self
.msg_queue
.iter()
.filter(|nm| match nm.0 {
Destination::Handled(i) => index_ss != &i,
_ => true,
})
.map(|nm| &nm.1)
.cloned()
.collect();
match ss.put_messages(*index_ss, msgs.clone()).await {
Ok(mut new_msgs) => {
new_msg_queue.append(&mut new_msgs);
}
Err(e) => {
ss_remove.push(*index_ss);
log::error!("{}: While sending messages: {e}", type_id);
}
}
}
self.msg_queue = new_msg_queue;
ss_remove
}
async fn send_tap(&mut self) -> Vec<usize> {
let mut faulty = vec![];
let msgs: Vec<T> = self
.msg_queue
.iter()
.filter(|(dst, _)| dst != &Destination::NoTap)
.map(|(_, msg)| msg.clone())
.collect();
let type_id = self.type_id();
for (i, ss) in self.subsystems.iter_mut().filter(|(_, ss)| ss.is_tap()) {
if let Err(e) = ss.put_messages(*i, msgs.clone()).await {
log::warn!(
"{}: Couldn't send to tap: {e:?} - perhaps you should use 'remove_subsystem'?",
type_id
);
faulty.push(*i);
}
}
faulty
}
fn type_id(&self) -> String {
std::any::type_name::<T>().into()
}
}
#[cfg(feature = "nosend")]
pub enum Subsystem<T> {
Tap(UnboundedSender<T>),
TapSync(Sender<T>),
Handler(Box<dyn SubsystemHandler<T>>),
Translator(Box<dyn SubsystemTranslator<T>>),
Callback(SubsystemCallback<T>),
TranslatorCallback(SubsystemTranslatorCallback<T>),
}
#[cfg(not(feature = "nosend"))]
pub enum Subsystem<T> {
Tap(UnboundedSender<T>),
TapSync(Sender<T>),
Handler(Box<dyn SubsystemHandler<T> + Send>),
Translator(Box<dyn SubsystemTranslator<T> + Send>),
Callback(SubsystemCallback<T>),
TranslatorCallback(SubsystemTranslatorCallback<T>),
}
impl<T: Async + Clone + fmt::Debug> Subsystem<T> {
async fn put_messages(
&mut self,
index: usize,
msgs: Vec<T>,
) -> Result<Vec<(Destination, T)>, BrokerError> {
Ok(match self {
Self::TapSync(s) => {
for msg in msgs {
s.send(msg.clone())
.map_err(|_| BrokerError::SendQueue("send_tap".into()))?;
}
vec![]
}
Self::Tap(s) => {
for msg in msgs {
s.send(msg.clone())
.map_err(|_| BrokerError::SendQueue("send_tap_async".into()))?;
}
vec![]
}
Self::Handler(h) => {
let ret = h.messages(msgs).await;
ret.into_iter()
.map(|m| (Destination::Handled(index), m))
.collect()
}
Self::Callback(h) => h(msgs).await,
_ => vec![],
})
}
fn is_tap(&self) -> bool {
matches!(self, Self::TapSync(_)) || matches!(self, Self::Tap(_))
}
fn is_translator(&self) -> bool {
matches!(self, Self::Translator(_)) || matches!(self, Self::TranslatorCallback(_))
}
fn is_handler(&self) -> bool {
matches!(self, Self::Handler(_)) || matches!(self, Self::Callback(_))
}
async fn settle(&mut self, callers: Vec<BrokerID>) -> Result<(), BrokerError> {
if let Self::Translator(tr) = self {
tr.settle(callers).await
} else {
Ok(())
}
}
}
impl<T> fmt::Debug for Subsystem<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::TapSync(_) => write!(f, "Tap"),
Self::Tap(_) => write!(f, "TapAsync"),
Self::Handler(_) => write!(f, "Handler"),
Self::Translator(_) => write!(f, "Translator"),
Self::TranslatorCallback(_) => write!(f, "TranslatorCallback"),
Self::Callback(_) => write!(f, "Callback"),
}
}
}
#[cfg_attr(feature = "nosend", async_trait(?Send))]
#[cfg_attr(not(feature = "nosend"), async_trait)]
pub trait SubsystemHandler<T: Async> {
async fn messages(&mut self, from_broker: Vec<T>) -> Vec<T>;
}
#[cfg_attr(feature = "nosend", async_trait(?Send))]
#[cfg_attr(not(feature = "nosend"), async_trait)]
pub trait SubsystemTranslator<T: Async> {
async fn translate(&mut self, trail: Vec<BrokerID>, from_broker: T) -> bool;
async fn settle(&mut self, callers: Vec<BrokerID>) -> Result<(), BrokerError>;
}
#[cfg(feature = "nosend")]
type SubsystemCallback<T> = Box<dyn Fn(Vec<T>) -> BoxFuture<'static, Vec<(Destination, T)>>>;
#[cfg(not(feature = "nosend"))]
type SubsystemCallback<T> =
Box<dyn Fn(Vec<T>) -> BoxFuture<'static, Vec<(Destination, T)>> + Send + Sync>;
#[cfg(feature = "nosend")]
type SubsystemTranslatorCallback<T> = Box<dyn Fn(Vec<BrokerID>, T) -> BoxFuture<'static, bool>>;
#[cfg(not(feature = "nosend"))]
type SubsystemTranslatorCallback<T> =
Box<dyn Fn(Vec<BrokerID>, T) -> BoxFuture<'static, bool> + Send + Sync>;
#[cfg(test)]
mod tests {
use flarch::{start_logging, start_logging_filter_level};
use super::*;
#[derive(Debug, Clone, PartialEq)]
pub enum BrokerTest {
MsgA,
MsgB,
}
pub struct Tps {
reply: Vec<(BrokerTest, BrokerTest)>,
}
#[cfg_attr(feature = "nosend", async_trait(?Send))]
#[cfg_attr(not(feature = "nosend"), async_trait)]
impl SubsystemHandler<BrokerTest> for Tps {
async fn messages(&mut self, msgs: Vec<BrokerTest>) -> Vec<BrokerTest> {
let mut output = vec![];
log::debug!("Msgs are: {:?} - Replies are: {:?}", msgs, self.reply);
for msg in msgs {
if let Some(bm) = self.reply.iter().find(|repl| repl.0 == msg) {
log::debug!("Found message");
output.push(bm.1.clone());
}
}
output
}
}
#[tokio::test]
async fn test_broker_new() -> Result<(), BrokerError> {
start_logging();
let bm_a = BrokerTest::MsgA;
let bm_b = BrokerTest::MsgB;
let broker = &mut Broker::new();
broker
.add_subsystem(Subsystem::Handler(Box::new(Tps {
reply: vec![(bm_a.clone(), bm_b.clone())],
})))
.await?;
let (tap_tx, tap) = channel::<BrokerTest>();
broker.add_subsystem(Subsystem::TapSync(tap_tx)).await?;
broker.settle_msg(bm_b.clone()).await?;
assert_eq!(tap.try_iter().count(), 1);
broker.settle_msg(bm_a.clone()).await?;
assert_eq!(tap.try_iter().count(), 2);
broker
.add_subsystem(Subsystem::Handler(Box::new(Tps {
reply: vec![(bm_a.clone(), bm_b)],
})))
.await?;
broker.settle_msg(bm_a).await?;
assert_eq!(tap.try_iter().count(), 3);
Ok(())
}
#[derive(Clone, PartialEq, Debug)]
enum MessageA {
One,
Two,
Four,
}
fn translate_ab(msg: MessageA) -> Option<MessageB> {
match msg {
MessageA::Two => Some(MessageB::Deux),
_ => None,
}
}
fn translate_ba(msg: MessageB) -> Option<MessageA> {
match msg {
MessageB::Un => Some(MessageA::One),
_ => None,
}
}
#[derive(Clone, PartialEq, Debug)]
enum MessageB {
Un,
Deux,
Trois,
}
#[derive(Error, Debug)]
enum ConvertError {
#[error("Wrong conversion")]
Conversion(String),
#[error(transparent)]
Broker(#[from] BrokerError),
}
#[tokio::test]
async fn link() -> Result<(), ConvertError> {
start_logging_filter_level(vec![], log::LevelFilter::Trace);
let mut broker_a: Broker<MessageA> = Broker::new();
let (tap_a_tx, tap_a_rx) = channel::<MessageA>();
broker_a.add_subsystem(Subsystem::TapSync(tap_a_tx)).await?;
let mut broker_b: Broker<MessageB> = Broker::new();
let (tap_b_tx, tap_b_rx) = channel::<MessageB>();
broker_b.add_subsystem(Subsystem::TapSync(tap_b_tx)).await?;
broker_b
.link_bi(
broker_a.clone(),
Box::new(translate_ab),
Box::new(translate_ba),
)
.await?;
broker_a.settle_msg(MessageA::Two).await?;
if let Ok(msg) = tap_b_rx.try_recv() {
assert_eq!(MessageB::Deux, msg);
} else {
return Err(ConvertError::Conversion("A to B".to_string()));
}
broker_b.settle_msg(MessageB::Un).await?;
if let Ok(msg) = tap_a_rx.try_recv() {
assert_eq!(MessageA::One, msg);
} else {
return Err(ConvertError::Conversion("B to A".to_string()));
}
broker_a.settle_msg(MessageA::Four).await?;
tap_a_rx.recv().unwrap();
assert!(tap_b_rx.try_recv().is_err());
broker_b.settle_msg(MessageB::Trois).await?;
tap_b_rx.recv().unwrap();
assert!(tap_a_rx.try_recv().is_err());
Ok(())
}
#[tokio::test]
async fn link_infinite() -> Result<(), Box<dyn std::error::Error>> {
start_logging_filter_level(vec![], log::LevelFilter::Trace);
let mut a = Broker::<MessageA>::new();
let mut b = Broker::<MessageA>::new();
a.link_bi(
b.clone(),
Box::new(|msg| Some(msg)),
Box::new(|msg| Some(msg)),
)
.await?;
let (tap_a, _) = a.get_tap_sync().await?;
let (tap_b, _) = b.get_tap_sync().await?;
a.settle_msg(MessageA::One).await?;
assert!(tap_a.try_recv().is_ok());
assert!(tap_b.try_recv().is_err());
Ok(())
}
async fn do_something(from_broker: Vec<MessageA>) -> Vec<(Destination, MessageA)> {
from_broker
.iter()
.filter(|msg| msg == &&MessageA::One)
.map(|_| (Destination::All, MessageA::Two))
.collect()
}
#[tokio::test]
async fn test_callback() -> Result<(), Box<dyn std::error::Error>> {
start_logging_filter_level(vec![], log::LevelFilter::Trace);
let mut b = Broker::new();
b.add_subsystem(Subsystem::Callback(Box::new(|b_in| {
Box::pin(do_something(b_in))
})))
.await?;
let tap = b.get_tap_sync().await?;
b.settle_msg_dest(Destination::NoTap, MessageA::One).await?;
assert_eq!(MessageA::Two, tap.0.recv()?);
Ok(())
}
async fn test_translator_cb(msg: MessageA) -> bool {
matches!(msg, MessageA::One)
}
#[tokio::test]
async fn test_translator_callback() -> Result<(), Box<dyn std::error::Error>> {
start_logging_filter_level(vec![], log::LevelFilter::Trace);
let mut b = Broker::new();
b.add_subsystem(Subsystem::TranslatorCallback(Box::new(|_, b_in| {
Box::pin(test_translator_cb(b_in))
})))
.await?;
let tap = b.get_tap_sync().await?;
b.settle_msg(MessageA::One).await?;
assert!(tap.0.try_recv().is_err());
b.settle_msg(MessageA::Two).await?;
assert_eq!(MessageA::Two, tap.0.recv()?);
Ok(())
}
}