Skip to main content

atomr_patterns/bus/
mod.rs

1//! Domain Event Bus pattern.
2//!
3//! In-process broadcast of domain events to interested subscribers.
4//! Typically used between a write-side [`crate::cqrs::CqrsPattern`]
5//! and downstream readers / sagas / external integrations.
6//!
7//! v2 also ships a cluster-wide variant behind the `bus-cluster`
8//! Cargo feature. Configure it via
9//! [`BusBuilder::cluster`] / [`BusBuilder::topic`] /
10//! [`BusBuilder::codec`].
11
12#[cfg(feature = "bus-cluster")]
13mod cluster;
14
15#[cfg(feature = "bus-cluster")]
16use cluster::ClusterConfig;
17
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use atomr_core::actor::ActorSystem;
23#[cfg(feature = "bus-cluster")]
24use atomr_core::actor::{Actor, Context, Props};
25use parking_lot::RwLock;
26use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
27
28use crate::topology::Topology;
29use crate::PatternError;
30
31/// Public handle to the bus pattern.
32pub struct DomainEventBus<E>(PhantomData<E>);
33
34impl<E: Clone + Send + 'static> DomainEventBus<E> {
35    pub fn builder() -> BusBuilder<E> {
36        BusBuilder {
37            name: None,
38            #[cfg(feature = "bus-cluster")]
39            cluster: None,
40            _ev: PhantomData,
41        }
42    }
43}
44
45pub struct BusBuilder<E: Clone + Send + 'static> {
46    name: Option<String>,
47    #[cfg(feature = "bus-cluster")]
48    cluster: Option<ClusterConfig<E>>,
49    _ev: PhantomData<E>,
50}
51
52impl<E: Clone + Send + 'static> BusBuilder<E> {
53    pub fn name(mut self, n: impl Into<String>) -> Self {
54        self.name = Some(n.into());
55        self
56    }
57
58    /// Enable cluster fan-out via
59    /// [`atomr_cluster_tools::ClusterPubSub`]. The caller supplies a
60    /// `local` [`atomr_cluster_tools::DistributedPubSub`] and a
61    /// `cluster` wrapper already constructed against a transport.
62    /// Combine with [`Self::topic`], [`Self::type_id`], and
63    /// [`Self::codec`] to finish the wiring.
64    #[cfg(feature = "bus-cluster")]
65    pub fn cluster(
66        mut self,
67        local: Arc<atomr_cluster_tools::DistributedPubSub>,
68        cluster: Arc<atomr_cluster_tools::ClusterPubSub>,
69    ) -> Self {
70        let topic = self.name.clone().unwrap_or_else(|| "bus".into());
71        let cfg = ClusterConfig {
72            local,
73            cluster,
74            topic: topic.clone(),
75            type_id: topic,
76            encode: Arc::new(|_e: &E| Vec::new()),
77            decode: Arc::new(|_b: &[u8]| Err("codec not configured".into())),
78        };
79        self.cluster = Some(cfg);
80        self
81    }
82
83    /// Set the cluster-wide topic name. Defaults to the bus name.
84    /// No-op unless `cluster()` was called first.
85    #[cfg(feature = "bus-cluster")]
86    pub fn topic(mut self, topic: impl Into<String>) -> Self {
87        if let Some(c) = self.cluster.as_mut() {
88            c.topic = topic.into();
89        }
90        self
91    }
92
93    /// Set the cross-node type tag used to dispatch incoming PDUs.
94    /// Defaults to the topic.
95    #[cfg(feature = "bus-cluster")]
96    pub fn type_id(mut self, id: impl Into<String>) -> Self {
97        if let Some(c) = self.cluster.as_mut() {
98            c.type_id = id.into();
99        }
100        self
101    }
102
103    /// Provide encode/decode closures for cross-node delivery.
104    /// `encode` is called for each `publish`; `decode` runs on
105    /// inbound PDUs from peer nodes.
106    #[cfg(feature = "bus-cluster")]
107    pub fn codec<EncFn, DecFn>(mut self, encode: EncFn, decode: DecFn) -> Self
108    where
109        EncFn: Fn(&E) -> Vec<u8> + Send + Sync + 'static,
110        DecFn: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
111    {
112        if let Some(c) = self.cluster.as_mut() {
113            c.encode = Arc::new(encode);
114            c.decode = Arc::new(decode);
115        }
116        self
117    }
118
119    pub fn build(self) -> BusTopology<E> {
120        BusTopology {
121            name: self.name.unwrap_or_else(|| "bus".into()),
122            #[cfg(feature = "bus-cluster")]
123            cluster: self.cluster,
124            _ev: PhantomData,
125        }
126    }
127}
128
129pub struct BusTopology<E: Clone + Send + 'static> {
130    #[allow(dead_code)]
131    name: String,
132    #[cfg(feature = "bus-cluster")]
133    cluster: Option<ClusterConfig<E>>,
134    _ev: PhantomData<E>,
135}
136
137/// Bus handles. Use [`BusHandles::publish`] to push events; call
138/// [`BusHandles::subscribe`] to obtain a fresh receiver.
139pub struct BusHandles<E: Clone + Send + 'static> {
140    inner: Arc<BusInner<E>>,
141}
142
143impl<E: Clone + Send + 'static> Clone for BusHandles<E> {
144    fn clone(&self) -> Self {
145        Self { inner: self.inner.clone() }
146    }
147}
148
149struct BusInner<E: Clone + Send + 'static> {
150    subscribers: RwLock<Vec<UnboundedSender<E>>>,
151    #[cfg(feature = "bus-cluster")]
152    cluster: Option<ClusterConfig<E>>,
153}
154
155impl<E: Clone + Send + 'static> BusHandles<E> {
156    /// Broadcast `event` to every live subscriber. Closed receivers
157    /// are pruned in-line. When the bus is configured with
158    /// `cluster()`, also forwards to remote nodes via
159    /// [`atomr_cluster_tools::ClusterPubSub`].
160    pub fn publish(&self, event: E) {
161        #[cfg(feature = "bus-cluster")]
162        {
163            if let Some(cfg) = &self.inner.cluster {
164                // publish_remote fans out locally via DistributedPubSub
165                // (delivers to our internal BusRouter actor, which
166                // forwards to subscribers) AND forwards to peer nodes.
167                let encode = cfg.encode.clone();
168                cfg.cluster.publish_remote::<E, _>(&cfg.topic, event, &cfg.type_id, |e| encode(e));
169                return;
170            }
171        }
172        // Local-only path.
173        let mut guard = self.inner.subscribers.write();
174        guard.retain(|tx| tx.send(event.clone()).is_ok());
175    }
176
177    /// Subscribe and receive a fresh channel. The returned
178    /// [`UnboundedReceiver`] is closed when the bus drops or the
179    /// receiver is dropped. Subscribers receive both locally-published
180    /// events *and* events forwarded from peer nodes (when
181    /// clustered).
182    pub fn subscribe(&self) -> UnboundedReceiver<E> {
183        let (tx, rx) = unbounded_channel();
184        self.inner.subscribers.write().push(tx);
185        rx
186    }
187}
188
189/// Internal actor that bridges DistributedPubSub deliveries (typed
190/// `ActorRef<E>`) into the bus's mpsc subscriber list.
191#[cfg(feature = "bus-cluster")]
192struct BusRouter<E: Clone + Send + 'static> {
193    inner: Arc<BusInner<E>>,
194}
195
196#[cfg(feature = "bus-cluster")]
197#[async_trait]
198impl<E: Clone + Send + 'static> Actor for BusRouter<E> {
199    type Msg = E;
200    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: E) {
201        let mut guard = self.inner.subscribers.write();
202        guard.retain(|tx| tx.send(msg.clone()).is_ok());
203    }
204}
205
206#[async_trait]
207impl<E: Clone + Send + 'static> Topology for BusTopology<E> {
208    type Handles = BusHandles<E>;
209
210    #[cfg_attr(not(feature = "bus-cluster"), allow(unused_variables))]
211    async fn materialize(self, system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
212        let inner = Arc::new(BusInner {
213            subscribers: RwLock::new(Vec::new()),
214            #[cfg(feature = "bus-cluster")]
215            cluster: self.cluster,
216        });
217        let handles = BusHandles { inner: inner.clone() };
218
219        // Cluster wiring: spawn a router actor, subscribe it to the
220        // local DistributedPubSub for the topic (so announce_to sees
221        // it), and register a decoder that re-publishes inbound peer
222        // PDUs into the same local pubsub. The router forwards
223        // received events into our subscribers list.
224        #[cfg(feature = "bus-cluster")]
225        if let Some(cfg) = inner.cluster.as_ref() {
226            let router_inner = inner.clone();
227            let router_name = format!("bus-router-{}", self.name);
228            let router_ref = system
229                .actor_of(Props::create(move || BusRouter::<E> { inner: router_inner.clone() }), &router_name)
230                .map_err(|e| PatternError::Invariant(format!("spawn bus router: {e}")))?;
231
232            cfg.local.subscribe(cfg.topic.clone(), router_ref);
233
234            let local_for_decoder = cfg.local.clone();
235            let topic_for_decoder = cfg.topic.clone();
236            let decode = cfg.decode.clone();
237            cfg.cluster.register_decoder(cfg.type_id.clone(), move |bytes| match decode(bytes) {
238                Ok(event) => local_for_decoder.publish_msg::<E>(&topic_for_decoder, event) > 0,
239                Err(e) => {
240                    tracing::warn!(error = %e, "cluster bus decode failed");
241                    false
242                }
243            });
244        }
245
246        Ok(handles)
247    }
248}