#![allow(clippy::uninlined_format_args)]
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use oxirs_cluster::stream_integration::StreamMessage;
use oxirs_cluster::streaming::cluster_sink::{SinkError, StreamSink};
use oxirs_stream::distributed::{
CoordinatorConfig, DistributedStreamCoordinator, EventShipper, InProcessShipperTransport,
ShippedEvent, ShipperConfig, ShipperTransport,
};
#[derive(Default)]
struct MockClusterSink {
proposals: Mutex<Vec<StreamMessage>>,
commits: AtomicU64,
}
#[async_trait]
impl StreamSink for MockClusterSink {
async fn write_batch(&self, events: Vec<StreamMessage>) -> Result<(), SinkError> {
self.commits.fetch_add(1, Ordering::Relaxed);
self.proposals.lock().extend(events);
Ok(())
}
}
struct Node {
id: String,
coord: Arc<DistributedStreamCoordinator>,
rx: Mutex<Option<mpsc::Receiver<ShippedEvent>>>,
transport_rx: Mutex<Option<mpsc::Receiver<ShippedEvent>>>,
sink: Arc<MockClusterSink>,
}
async fn build_three_node_cluster(
n_shards: u32,
) -> (Vec<Arc<Node>>, Arc<InProcessShipperTransport>) {
let transport = Arc::new(InProcessShipperTransport::new(1024));
let mut nodes = Vec::new();
for id in ["n1", "n2", "n3"] {
let sink = Arc::new(MockClusterSink::default());
let shipper = Arc::new(EventShipper::new(
ShipperConfig {
local_node: id.into(),
},
transport.clone() as Arc<dyn ShipperTransport>,
));
let (tx, rx) = mpsc::channel(1024);
shipper.install_local_sink(tx);
let t_rx = transport.spawn_receiver(id.to_string());
let cfg = CoordinatorConfig {
coord_id: "cluster-1".into(),
local_node: id.into(),
n_shards,
stream_id: None,
idempotent_membership: true,
};
let coord =
Arc::new(DistributedStreamCoordinator::new(cfg, sink.clone(), shipper).expect("ok"));
nodes.push(Arc::new(Node {
id: id.into(),
coord,
rx: Mutex::new(Some(rx)),
transport_rx: Mutex::new(Some(t_rx)),
sink,
}));
}
for node in &nodes {
for peer in ["n1", "n2", "n3"] {
node.coord.join(peer.into()).await.expect("join");
}
}
(nodes, transport)
}
#[tokio::test]
async fn three_node_cluster_balances_shards() {
let (nodes, _t) = build_three_node_cluster(9).await;
let a = nodes[0].coord.current_assignment();
let b = nodes[1].coord.current_assignment();
let c = nodes[2].coord.current_assignment();
assert_eq!(a, b);
assert_eq!(b, c);
let counts = a.counts();
for count in counts.values() {
assert_eq!(*count, 3, "9 shards across 3 nodes should give 3 each");
}
}
#[tokio::test]
async fn three_node_cluster_routes_no_event_lost_on_kill() {
let (nodes, _t) = build_three_node_cluster(6).await;
let n1 = nodes[0].clone();
let n2 = nodes[1].clone();
let n3 = nodes[2].clone();
for i in 0..100 {
let key = format!("k-{i}");
let payload = serde_json::json!({"i": i});
n1.coord.route(&key, &payload).await.expect("route");
}
let drain = |node: &Arc<Node>| -> Vec<ShippedEvent> {
let mut out = Vec::new();
let mut rx_opt = node.rx.lock().take();
if let Some(rx) = rx_opt.as_mut() {
while let Ok(ev) = rx.try_recv() {
out.push(ev);
}
}
*node.rx.lock() = rx_opt;
let mut t_opt = node.transport_rx.lock().take();
if let Some(rx) = t_opt.as_mut() {
while let Ok(ev) = rx.try_recv() {
out.push(ev);
}
}
*node.transport_rx.lock() = t_opt;
out
};
let pre1 = drain(&n1);
let pre2 = drain(&n2);
let pre3 = drain(&n3);
let pre_total = pre1.len() + pre2.len() + pre3.len();
assert_eq!(pre_total, 100, "every routed event should be delivered");
n2.coord.leave("n1").await.expect("leave-on-n2");
n3.coord.leave("n1").await.expect("leave-on-n3");
let assn2 = n2.coord.current_assignment();
let assn3 = n3.coord.current_assignment();
assert_eq!(assn2, assn3, "surviving nodes converge on same assignment");
for owner in assn2.map.values() {
assert!(owner == "n2" || owner == "n3");
}
for i in 100..200 {
let key = format!("k-{i}");
let payload = serde_json::json!({"i": i});
n2.coord.route(&key, &payload).await.expect("route");
}
let post2 = drain(&n2);
let post3 = drain(&n3);
let post_total = post2.len() + post3.len();
assert_eq!(post_total, 100, "no events lost after node kill");
}
#[tokio::test]
async fn key_routing_is_deterministic_across_nodes() {
let (nodes, _t) = build_three_node_cluster(8).await;
let key = "deterministic-key";
let s1 = nodes[0].coord.shard_for_key_value(key).expect("shard");
let s2 = nodes[1].coord.shard_for_key_value(key).expect("shard");
let s3 = nodes[2].coord.shard_for_key_value(key).expect("shard");
assert_eq!(s1, s2);
assert_eq!(s2, s3);
let owner1 = nodes[0]
.coord
.current_assignment()
.owner_of(s1)
.cloned()
.expect("owner");
let owner2 = nodes[1]
.coord
.current_assignment()
.owner_of(s2)
.cloned()
.expect("owner");
assert_eq!(owner1, owner2);
}
#[tokio::test]
async fn coordinator_persists_assignment_through_sink() {
let (nodes, _t) = build_three_node_cluster(4).await;
for node in &nodes {
let commits = node.sink.commits.load(Ordering::Relaxed);
assert!(commits >= 3, "node {} commits = {}", node.id, commits);
}
}