#![cfg(feature = "bus-cluster")]
use std::sync::Arc;
use std::time::Duration;
use atomr_cluster_tools::{ClusterPubSub, DistributedPubSub, MediatorPdu, MediatorTransport};
use atomr_config::Config;
use atomr_core::actor::ActorSystem;
use atomr_patterns::bus::DomainEventBus;
use atomr_patterns::topology::Topology;
use parking_lot::Mutex;
#[derive(Default, Clone)]
struct LoopbackTransport {
inner: Arc<Mutex<std::collections::HashMap<String, Arc<ClusterPubSub>>>>,
}
impl LoopbackTransport {
fn register(&self, node: impl Into<String>, cluster: Arc<ClusterPubSub>) {
self.inner.lock().insert(node.into(), cluster);
}
}
impl MediatorTransport for LoopbackTransport {
fn send(&self, target_node: &str, pdu: MediatorPdu) {
let target = self.inner.lock().get(target_node).cloned();
if let Some(c) = target {
c.apply_pdu(pdu);
}
}
}
#[tokio::test]
async fn event_published_on_node_a_reaches_subscriber_on_node_b() {
let system_a = ActorSystem::create("node-a", Config::reference()).await.unwrap();
let system_b = ActorSystem::create("node-b", Config::reference()).await.unwrap();
let transport: Arc<LoopbackTransport> = Arc::new(LoopbackTransport::default());
let transport_arc: Arc<dyn MediatorTransport> = transport.clone();
let local_a = DistributedPubSub::new();
let local_b = DistributedPubSub::new();
let cluster_a = ClusterPubSub::new(local_a.clone(), "node-a", transport_arc.clone());
let cluster_b = ClusterPubSub::new(local_b.clone(), "node-b", transport_arc.clone());
transport.register("node-a", cluster_a.clone());
transport.register("node-b", cluster_b.clone());
let bus_a = DomainEventBus::<u32>::builder()
.name("orders")
.cluster(local_a.clone(), cluster_a.clone())
.topic("orders")
.type_id("u32")
.codec(
|e: &u32| e.to_le_bytes().to_vec(),
|b: &[u8]| {
let arr: [u8; 4] = b.try_into().map_err(|_| "len".to_string())?;
Ok(u32::from_le_bytes(arr))
},
)
.build()
.materialize(&system_a)
.await
.unwrap();
let bus_b = DomainEventBus::<u32>::builder()
.name("orders")
.cluster(local_b.clone(), cluster_b.clone())
.topic("orders")
.type_id("u32")
.codec(
|e: &u32| e.to_le_bytes().to_vec(),
|b: &[u8]| {
let arr: [u8; 4] = b.try_into().map_err(|_| "len".to_string())?;
Ok(u32::from_le_bytes(arr))
},
)
.build()
.materialize(&system_b)
.await
.unwrap();
let mut sub_b = bus_b.subscribe();
cluster_b.announce_to("node-a");
tokio::time::sleep(Duration::from_millis(20)).await;
bus_a.publish(42u32);
let received = tokio::time::timeout(Duration::from_secs(1), sub_b.recv())
.await
.expect("timed out")
.expect("subscriber closed");
assert_eq!(received, 42);
system_a.terminate().await;
system_b.terminate().await;
}