atomr_patterns/bus/
mod.rs1#[cfg(feature = "bus-cluster")]
13mod cluster;
14
15#[cfg(feature = "bus-cluster")]
16use cluster::ClusterConfig;
17
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use atomr_core::actor::ActorSystem;
23#[cfg(feature = "bus-cluster")]
24use atomr_core::actor::{Actor, Context, Props};
25use parking_lot::RwLock;
26use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
27
28use crate::topology::Topology;
29use crate::PatternError;
30
31pub struct DomainEventBus<E>(PhantomData<E>);
33
34impl<E: Clone + Send + 'static> DomainEventBus<E> {
35 pub fn builder() -> BusBuilder<E> {
36 BusBuilder {
37 name: None,
38 #[cfg(feature = "bus-cluster")]
39 cluster: None,
40 _ev: PhantomData,
41 }
42 }
43}
44
45pub struct BusBuilder<E: Clone + Send + 'static> {
46 name: Option<String>,
47 #[cfg(feature = "bus-cluster")]
48 cluster: Option<ClusterConfig<E>>,
49 _ev: PhantomData<E>,
50}
51
52impl<E: Clone + Send + 'static> BusBuilder<E> {
53 pub fn name(mut self, n: impl Into<String>) -> Self {
54 self.name = Some(n.into());
55 self
56 }
57
58 #[cfg(feature = "bus-cluster")]
65 pub fn cluster(
66 mut self,
67 local: Arc<atomr_cluster_tools::DistributedPubSub>,
68 cluster: Arc<atomr_cluster_tools::ClusterPubSub>,
69 ) -> Self {
70 let topic = self.name.clone().unwrap_or_else(|| "bus".into());
71 let cfg = ClusterConfig {
72 local,
73 cluster,
74 topic: topic.clone(),
75 type_id: topic,
76 encode: Arc::new(|_e: &E| Vec::new()),
77 decode: Arc::new(|_b: &[u8]| Err("codec not configured".into())),
78 };
79 self.cluster = Some(cfg);
80 self
81 }
82
83 #[cfg(feature = "bus-cluster")]
86 pub fn topic(mut self, topic: impl Into<String>) -> Self {
87 if let Some(c) = self.cluster.as_mut() {
88 c.topic = topic.into();
89 }
90 self
91 }
92
93 #[cfg(feature = "bus-cluster")]
96 pub fn type_id(mut self, id: impl Into<String>) -> Self {
97 if let Some(c) = self.cluster.as_mut() {
98 c.type_id = id.into();
99 }
100 self
101 }
102
103 #[cfg(feature = "bus-cluster")]
107 pub fn codec<EncFn, DecFn>(mut self, encode: EncFn, decode: DecFn) -> Self
108 where
109 EncFn: Fn(&E) -> Vec<u8> + Send + Sync + 'static,
110 DecFn: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
111 {
112 if let Some(c) = self.cluster.as_mut() {
113 c.encode = Arc::new(encode);
114 c.decode = Arc::new(decode);
115 }
116 self
117 }
118
119 pub fn build(self) -> BusTopology<E> {
120 BusTopology {
121 name: self.name.unwrap_or_else(|| "bus".into()),
122 #[cfg(feature = "bus-cluster")]
123 cluster: self.cluster,
124 _ev: PhantomData,
125 }
126 }
127}
128
129pub struct BusTopology<E: Clone + Send + 'static> {
130 #[allow(dead_code)]
131 name: String,
132 #[cfg(feature = "bus-cluster")]
133 cluster: Option<ClusterConfig<E>>,
134 _ev: PhantomData<E>,
135}
136
137pub struct BusHandles<E: Clone + Send + 'static> {
140 inner: Arc<BusInner<E>>,
141}
142
143impl<E: Clone + Send + 'static> Clone for BusHandles<E> {
144 fn clone(&self) -> Self {
145 Self { inner: self.inner.clone() }
146 }
147}
148
149struct BusInner<E: Clone + Send + 'static> {
150 subscribers: RwLock<Vec<UnboundedSender<E>>>,
151 #[cfg(feature = "bus-cluster")]
152 cluster: Option<ClusterConfig<E>>,
153}
154
155impl<E: Clone + Send + 'static> BusHandles<E> {
156 pub fn publish(&self, event: E) {
161 #[cfg(feature = "bus-cluster")]
162 {
163 if let Some(cfg) = &self.inner.cluster {
164 let encode = cfg.encode.clone();
168 cfg.cluster.publish_remote::<E, _>(&cfg.topic, event, &cfg.type_id, |e| encode(e));
169 return;
170 }
171 }
172 let mut guard = self.inner.subscribers.write();
174 guard.retain(|tx| tx.send(event.clone()).is_ok());
175 }
176
177 pub fn subscribe(&self) -> UnboundedReceiver<E> {
183 let (tx, rx) = unbounded_channel();
184 self.inner.subscribers.write().push(tx);
185 rx
186 }
187}
188
189#[cfg(feature = "bus-cluster")]
192struct BusRouter<E: Clone + Send + 'static> {
193 inner: Arc<BusInner<E>>,
194}
195
196#[cfg(feature = "bus-cluster")]
197#[async_trait]
198impl<E: Clone + Send + 'static> Actor for BusRouter<E> {
199 type Msg = E;
200 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: E) {
201 let mut guard = self.inner.subscribers.write();
202 guard.retain(|tx| tx.send(msg.clone()).is_ok());
203 }
204}
205
206#[async_trait]
207impl<E: Clone + Send + 'static> Topology for BusTopology<E> {
208 type Handles = BusHandles<E>;
209
210 #[cfg_attr(not(feature = "bus-cluster"), allow(unused_variables))]
211 async fn materialize(self, system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
212 let inner = Arc::new(BusInner {
213 subscribers: RwLock::new(Vec::new()),
214 #[cfg(feature = "bus-cluster")]
215 cluster: self.cluster,
216 });
217 let handles = BusHandles { inner: inner.clone() };
218
219 #[cfg(feature = "bus-cluster")]
225 if let Some(cfg) = inner.cluster.as_ref() {
226 let router_inner = inner.clone();
227 let router_name = format!("bus-router-{}", self.name);
228 let router_ref = system
229 .actor_of(Props::create(move || BusRouter::<E> { inner: router_inner.clone() }), &router_name)
230 .map_err(|e| PatternError::Invariant(format!("spawn bus router: {e}")))?;
231
232 cfg.local.subscribe(cfg.topic.clone(), router_ref);
233
234 let local_for_decoder = cfg.local.clone();
235 let topic_for_decoder = cfg.topic.clone();
236 let decode = cfg.decode.clone();
237 cfg.cluster.register_decoder(cfg.type_id.clone(), move |bytes| match decode(bytes) {
238 Ok(event) => local_for_decoder.publish_msg::<E>(&topic_for_decoder, event) > 0,
239 Err(e) => {
240 tracing::warn!(error = %e, "cluster bus decode failed");
241 false
242 }
243 });
244 }
245
246 Ok(handles)
247 }
248}