use std::time::{Duration, Instant};
use tokio::time::sleep;
use crate::gossip::fanout::GossipFanout;
use super::node::{TcpClusterNode, TcpNodeConfig, TcpNodeError};
#[derive(Debug, Clone)]
pub struct NetworkStats {
pub node_count: usize,
pub converged: bool,
pub rounds_to_converge: usize,
pub total_time_ms: u64,
pub final_state_entries: usize,
}
pub struct TcpClusterNetwork {
nodes: Vec<TcpClusterNode>,
}
impl TcpClusterNetwork {
pub async fn spawn(
n: usize,
_base_port: u16,
fanout: GossipFanout,
gossip_interval_ms: u64,
) -> Result<Self, Box<dyn std::error::Error>> {
let mut nodes = Vec::with_capacity(n);
for i in 0..n {
let cfg = TcpNodeConfig {
node_id: format!("node-{i}"),
bind_addr: "127.0.0.1:0".parse()?,
fanout,
gossip_interval_ms,
};
let node = TcpClusterNode::start(cfg)
.await
.map_err(|e: TcpNodeError| Box::new(e) as Box<dyn std::error::Error>)?;
nodes.push(node);
}
let addrs: Vec<_> = nodes.iter().map(|n| n.addr()).collect();
for (i, node) in nodes.iter().enumerate() {
for (j, &addr) in addrs.iter().enumerate() {
if i != j {
node.add_peer(addr);
}
}
}
Ok(Self { nodes })
}
pub fn set_on(&self, idx: usize, key: &str, value: u64) {
self.nodes[idx].set(key, value);
}
pub fn set_with_version_on(&self, idx: usize, key: &str, value: u64, version: u64) {
self.nodes[idx].set_with_version(key, value, version);
}
pub fn get_on(&self, idx: usize, key: &str) -> Option<u64> {
self.nodes[idx].get(key)
}
pub async fn wait_converged(
&self,
key: &str,
expected_value: u64,
max_wait: Duration,
) -> NetworkStats {
let start = Instant::now();
let mut rounds = 0usize;
loop {
let all_match = self
.nodes
.iter()
.all(|n| n.get(key) == Some(expected_value));
if all_match {
let elapsed = start.elapsed();
return NetworkStats {
node_count: self.nodes.len(),
converged: true,
rounds_to_converge: rounds,
total_time_ms: elapsed.as_millis() as u64,
final_state_entries: self.nodes.first().map_or(0, |n| n.state_len()),
};
}
if start.elapsed() >= max_wait {
let elapsed = start.elapsed();
return NetworkStats {
node_count: self.nodes.len(),
converged: false,
rounds_to_converge: rounds,
total_time_ms: elapsed.as_millis() as u64,
final_state_entries: self.nodes.first().map_or(0, |n| n.state_len()),
};
}
sleep(Duration::from_millis(10)).await;
rounds += 1;
}
}
pub fn shutdown_all(&self) {
for node in &self.nodes {
node.shutdown();
}
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_spawn_and_shutdown() {
let net = TcpClusterNetwork::spawn(3, 0, GossipFanout::Unbounded, 50)
.await
.expect("spawn");
assert_eq!(net.node_count(), 3);
net.shutdown_all();
}
#[tokio::test]
async fn test_set_and_get_on() {
let net = TcpClusterNetwork::spawn(2, 0, GossipFanout::Unbounded, 50)
.await
.expect("spawn");
net.set_on(0, "x", 77);
assert_eq!(net.get_on(0, "x"), Some(77));
net.shutdown_all();
}
}