#[cfg(feature = "bus-cluster")]
mod cluster;
#[cfg(feature = "bus-cluster")]
use cluster::ClusterConfig;
use std::marker::PhantomData;
use std::sync::Arc;
use async_trait::async_trait;
use atomr_core::actor::ActorSystem;
#[cfg(feature = "bus-cluster")]
use atomr_core::actor::{Actor, Context, Props};
use parking_lot::RwLock;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::topology::Topology;
use crate::PatternError;
pub struct DomainEventBus<E>(PhantomData<E>);
impl<E: Clone + Send + 'static> DomainEventBus<E> {
pub fn builder() -> BusBuilder<E> {
BusBuilder {
name: None,
#[cfg(feature = "bus-cluster")]
cluster: None,
_ev: PhantomData,
}
}
}
pub struct BusBuilder<E: Clone + Send + 'static> {
name: Option<String>,
#[cfg(feature = "bus-cluster")]
cluster: Option<ClusterConfig<E>>,
_ev: PhantomData<E>,
}
impl<E: Clone + Send + 'static> BusBuilder<E> {
pub fn name(mut self, n: impl Into<String>) -> Self {
self.name = Some(n.into());
self
}
#[cfg(feature = "bus-cluster")]
pub fn cluster(
mut self,
local: Arc<atomr_cluster_tools::DistributedPubSub>,
cluster: Arc<atomr_cluster_tools::ClusterPubSub>,
) -> Self {
let topic = self.name.clone().unwrap_or_else(|| "bus".into());
let cfg = ClusterConfig {
local,
cluster,
topic: topic.clone(),
type_id: topic,
encode: Arc::new(|_e: &E| Vec::new()),
decode: Arc::new(|_b: &[u8]| Err("codec not configured".into())),
};
self.cluster = Some(cfg);
self
}
#[cfg(feature = "bus-cluster")]
pub fn topic(mut self, topic: impl Into<String>) -> Self {
if let Some(c) = self.cluster.as_mut() {
c.topic = topic.into();
}
self
}
#[cfg(feature = "bus-cluster")]
pub fn type_id(mut self, id: impl Into<String>) -> Self {
if let Some(c) = self.cluster.as_mut() {
c.type_id = id.into();
}
self
}
#[cfg(feature = "bus-cluster")]
pub fn codec<EncFn, DecFn>(mut self, encode: EncFn, decode: DecFn) -> Self
where
EncFn: Fn(&E) -> Vec<u8> + Send + Sync + 'static,
DecFn: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
{
if let Some(c) = self.cluster.as_mut() {
c.encode = Arc::new(encode);
c.decode = Arc::new(decode);
}
self
}
pub fn build(self) -> BusTopology<E> {
BusTopology {
name: self.name.unwrap_or_else(|| "bus".into()),
#[cfg(feature = "bus-cluster")]
cluster: self.cluster,
_ev: PhantomData,
}
}
}
pub struct BusTopology<E: Clone + Send + 'static> {
#[allow(dead_code)]
name: String,
#[cfg(feature = "bus-cluster")]
cluster: Option<ClusterConfig<E>>,
_ev: PhantomData<E>,
}
pub struct BusHandles<E: Clone + Send + 'static> {
inner: Arc<BusInner<E>>,
}
impl<E: Clone + Send + 'static> Clone for BusHandles<E> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
struct BusInner<E: Clone + Send + 'static> {
subscribers: RwLock<Vec<UnboundedSender<E>>>,
#[cfg(feature = "bus-cluster")]
cluster: Option<ClusterConfig<E>>,
}
impl<E: Clone + Send + 'static> BusHandles<E> {
pub fn publish(&self, event: E) {
#[cfg(feature = "bus-cluster")]
{
if let Some(cfg) = &self.inner.cluster {
let encode = cfg.encode.clone();
cfg.cluster.publish_remote::<E, _>(&cfg.topic, event, &cfg.type_id, |e| encode(e));
return;
}
}
let mut guard = self.inner.subscribers.write();
guard.retain(|tx| tx.send(event.clone()).is_ok());
}
pub fn subscribe(&self) -> UnboundedReceiver<E> {
let (tx, rx) = unbounded_channel();
self.inner.subscribers.write().push(tx);
rx
}
}
#[cfg(feature = "bus-cluster")]
struct BusRouter<E: Clone + Send + 'static> {
inner: Arc<BusInner<E>>,
}
#[cfg(feature = "bus-cluster")]
#[async_trait]
impl<E: Clone + Send + 'static> Actor for BusRouter<E> {
type Msg = E;
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: E) {
let mut guard = self.inner.subscribers.write();
guard.retain(|tx| tx.send(msg.clone()).is_ok());
}
}
#[async_trait]
impl<E: Clone + Send + 'static> Topology for BusTopology<E> {
type Handles = BusHandles<E>;
#[cfg_attr(not(feature = "bus-cluster"), allow(unused_variables))]
async fn materialize(self, system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
let inner = Arc::new(BusInner {
subscribers: RwLock::new(Vec::new()),
#[cfg(feature = "bus-cluster")]
cluster: self.cluster,
});
let handles = BusHandles { inner: inner.clone() };
#[cfg(feature = "bus-cluster")]
if let Some(cfg) = inner.cluster.as_ref() {
let router_inner = inner.clone();
let router_name = format!("bus-router-{}", self.name);
let router_ref = system
.actor_of(Props::create(move || BusRouter::<E> { inner: router_inner.clone() }), &router_name)
.map_err(|e| PatternError::Invariant(format!("spawn bus router: {e}")))?;
cfg.local.subscribe(cfg.topic.clone(), router_ref);
let local_for_decoder = cfg.local.clone();
let topic_for_decoder = cfg.topic.clone();
let decode = cfg.decode.clone();
cfg.cluster.register_decoder(cfg.type_id.clone(), move |bytes| match decode(bytes) {
Ok(event) => local_for_decoder.publish_msg::<E>(&topic_for_decoder, event) > 0,
Err(e) => {
tracing::warn!(error = %e, "cluster bus decode failed");
false
}
});
}
Ok(handles)
}
}