#![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use std::time::{Duration, Instant};
use liminal::channel::{ChannelConfig, ChannelHandle, ChannelMode, ChannelSupervisor, Schema};
use liminal_server::cluster::{self, ClusterHandle};
use liminal_server::config::types::ClusterConfig;
const COOKIE: &str = "srv005-loopback-cookie";
fn eventually(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
let deadline = Instant::now() + timeout;
loop {
if condition() {
return true;
}
if Instant::now() >= deadline {
return false;
}
std::thread::sleep(Duration::from_millis(20));
}
}
fn free_port() -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().expect("read ephemeral addr");
drop(listener);
addr
}
struct Node {
supervisor: ChannelSupervisor,
handle: ClusterHandle,
listen_addr: SocketAddr,
}
impl Node {
fn start(node_name: &str, listen_addr: SocketAddr, seeds: Vec<SocketAddr>) -> Self {
let resolver = Arc::new(cluster::discovery::ClusterResolver::new());
let supervisor = ChannelSupervisor::with_distribution(
node_name.to_owned(),
1,
COOKIE.to_owned(),
cluster::discovery::as_resolver(Arc::clone(&resolver)),
liminal::channel::ChannelRestartPolicy::default(),
)
.expect("clustered supervisor starts");
let config = ClusterConfig {
node_name: node_name.to_owned(),
listen_address: listen_addr,
seed_nodes: seeds,
cookie: COOKIE.to_owned(),
};
let scheduler = supervisor.scheduler();
let supervisor_for_observer = supervisor.clone();
let handle = cluster::start(&scheduler, resolver, &config, move |sync| {
supervisor_for_observer.install_observer(Arc::new(sync));
})
.expect("cluster starts");
Self {
supervisor,
handle,
listen_addr,
}
}
fn channel(&self, name: &str) -> ChannelHandle {
let schema = Schema::new(serde_json::json!({})).expect("empty schema");
ChannelHandle::with_supervisor(
ChannelConfig::new(name.to_owned(), schema, ChannelMode::Ephemeral),
self.supervisor.clone(),
)
}
fn peer_count(&self) -> usize {
self.handle.membership().peers().len()
}
fn shutdown(&self) {
self.supervisor.shutdown();
}
}
fn await_mutual_membership(a: &Node, b: &Node) {
assert!(
eventually(Duration::from_secs(10), || a.peer_count() >= 1
&& b.peer_count() >= 1),
"both nodes should see each other as peers"
);
}
#[test]
fn subscription_on_a_is_visible_as_remote_member_on_b() {
let addr_a = free_port();
let addr_b = free_port();
let node_a = Node::start("node-a@127.0.0.1", addr_a, vec![]);
let node_b = Node::start("node-b@127.0.0.1", addr_b, vec![addr_a]);
await_mutual_membership(&node_a, &node_b);
let channel_a = node_a.channel("orders");
let _subscription = channel_a.subscribe().expect("subscribe on A");
let channel_b = node_b.channel("orders");
assert!(
eventually(Duration::from_secs(10), || {
!node_b_remote_members(&node_b, "orders").is_empty()
}),
"node B should see A's subscription as a remote pg member"
);
let members = node_b_remote_members(&node_b, "orders");
assert_eq!(members.len(), 1, "exactly one remote subscriber");
drop(channel_b);
}
#[test]
fn publish_on_b_reaches_subscriber_on_a() {
let addr_a = free_port();
let addr_b = free_port();
let node_a = Node::start("node-a@127.0.0.1", addr_a, vec![]);
let node_b = Node::start("node-b@127.0.0.1", addr_b, vec![addr_a]);
await_mutual_membership(&node_a, &node_b);
let channel_a = node_a.channel("orders");
let subscription = channel_a.subscribe().expect("subscribe on A");
assert!(
eventually(Duration::from_secs(10), || {
!node_b_remote_members(&node_b, "orders").is_empty()
}),
"B should learn A's subscription before publishing"
);
let channel_b = node_b.channel("orders");
let payload = br#"{"order":"cross-node-1"}"#.to_vec();
channel_b.publish(&payload).expect("publish on B");
let received = eventually(Duration::from_secs(10), || {
matches!(subscription.try_next(), Ok(Some(_)))
|| a_inbox_has_payload(&subscription, &payload)
});
assert!(received, "A's subscriber should receive B's publish");
}
#[test]
fn late_joiner_is_backfilled_with_existing_subscriptions() {
let addr_a = free_port();
let addr_c = free_port();
let node_a = Node::start("node-a@127.0.0.1", addr_a, vec![]);
let channel_a = node_a.channel("events");
let _subscription = channel_a.subscribe().expect("subscribe on A");
let node_c = Node::start("node-c@127.0.0.1", addr_c, vec![addr_a]);
await_mutual_membership(&node_a, &node_c);
assert!(
eventually(Duration::from_secs(10), || {
!node_b_remote_members(&node_c, "events").is_empty()
}),
"late joiner C should be backfilled with A's existing subscription"
);
}
#[test]
fn dropping_a_node_purges_its_remote_members_and_survivors_still_deliver() {
let addr_a = free_port();
let addr_b = free_port();
let addr_c = free_port();
let node_a = Node::start("node-a@127.0.0.1", addr_a, vec![]);
let node_b = Node::start("node-b@127.0.0.1", addr_b, vec![addr_a]);
let node_c = Node::start("node-c@127.0.0.1", addr_c, vec![addr_a, addr_b]);
await_mutual_membership(&node_a, &node_b);
assert!(
eventually(Duration::from_secs(10), || node_c.peer_count() >= 2),
"C should connect to both A and B"
);
let channel_a = node_a.channel("orders");
let sub_a = channel_a.subscribe().expect("subscribe on A");
let channel_c = node_c.channel("orders");
let sub_c = channel_c.subscribe().expect("subscribe on C");
assert!(
eventually(Duration::from_secs(10), || {
node_b_remote_members(&node_b, "orders").len() >= 2
}),
"B should see remote members from both A and C"
);
drop(sub_a);
drop(channel_a);
node_a.shutdown();
drop(node_a);
assert!(
eventually(Duration::from_secs(15), || {
node_b_remote_members(&node_b, "orders").len() == 1
}),
"B should purge A's remote member after A drops, leaving only C"
);
let channel_b = node_b.channel("orders");
let payload = br#"{"order":"after-a-dropped"}"#.to_vec();
channel_b
.publish(&payload)
.expect("publish on B after A dropped");
let received = eventually(Duration::from_secs(10), || {
a_inbox_has_payload(&sub_c, &payload)
});
assert!(
received,
"C's surviving subscriber should still receive B's publish after A dropped"
);
}
fn node_b_remote_members(node: &Node, channel: &str) -> Vec<beamr::distribution::pg::RemoteMember> {
let scheduler = node.supervisor.scheduler();
let atoms = scheduler.atom_table();
let pg = scheduler.pg_registry();
let group = atoms.intern(channel);
pg.remote_members(pg.default_scope(), group)
}
fn a_inbox_has_payload(
subscription: &liminal::channel::SubscriptionHandle,
payload: &[u8],
) -> bool {
while let Ok(Some(envelope)) = subscription.try_next() {
if envelope.payload == payload {
return true;
}
}
false
}
impl std::fmt::Debug for Node {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("Node")
.field("listen_addr", &self.listen_addr)
.finish_non_exhaustive()
}
}