use noxu_rep::{NodeType, RepConfig, RepNode, ReplicatedEnvironment};
fn master_config(name: &str) -> RepConfig {
RepConfig::builder("f9_group", name, "127.0.0.1").node_port(0).build()
}
#[test]
fn become_master_registers_feeder_per_known_replica() {
let env = ReplicatedEnvironment::new(master_config("master")).unwrap();
env.add_peer(RepNode::new(
"replica1".to_string(),
NodeType::Electable,
"127.0.0.1".to_string(),
9001,
2,
))
.unwrap();
env.add_peer(RepNode::new(
"replica2".to_string(),
NodeType::Electable,
"127.0.0.1".to_string(),
9002,
3,
))
.unwrap();
env.become_master(1).unwrap();
let feeder_names = env.feeder_replica_names();
assert_eq!(
feeder_names.len(),
2,
"expected 2 feeders after become_master with 2 replicas, got {}: {:?}",
feeder_names.len(),
feeder_names,
);
assert!(feeder_names.contains(&"replica1".to_string()));
assert!(feeder_names.contains(&"replica2".to_string()));
env.close().unwrap();
}
#[test]
fn add_peer_while_master_dispatches_feeder() {
let env = ReplicatedEnvironment::new(master_config("solo")).unwrap();
env.become_master(1).unwrap();
assert_eq!(env.feeder_replica_names().len(), 0);
env.add_peer(RepNode::new(
"late_joiner".to_string(),
NodeType::Electable,
"127.0.0.1".to_string(),
9100,
2,
))
.unwrap();
let feeder_names = env.feeder_replica_names();
assert_eq!(feeder_names, vec!["late_joiner".to_string()]);
env.close().unwrap();
}
#[test]
fn add_peer_arbiter_is_not_fed() {
let env = ReplicatedEnvironment::new(master_config("master_arb")).unwrap();
env.become_master(1).unwrap();
env.add_peer(RepNode::new(
"arbiter1".to_string(),
NodeType::Arbiter,
"127.0.0.1".to_string(),
9200,
2,
))
.unwrap();
assert_eq!(env.feeder_replica_names().len(), 0);
env.close().unwrap();
}
#[test]
fn replicate_entry_pushes_into_peer_scanner_so_replicas_see_it() {
use noxu_rep::stream::LogWriter;
use noxu_rep::stream::peer_feeder::catch_up_from_peer;
use std::sync::Mutex;
let env = ReplicatedEnvironment::new(master_config("rep_entry")).unwrap();
env.become_master(1).unwrap();
let addr = env.bound_addr().expect("master must bind a port");
for v in 1u64..=5 {
env.replicate_entry(v, 0, (v * 100) as u32, 0, vec![v as u8; 4]);
}
struct CapturingWriter {
seen: Mutex<Vec<(u64, Vec<u8>)>>,
}
impl LogWriter for CapturingWriter {
fn write_entry(
&mut self,
vlsn: u64,
_entry_type: u8,
payload: &[u8],
) -> noxu_rep::error::Result<()> {
self.seen.lock().unwrap().push((vlsn, payload.to_vec()));
Ok(())
}
}
let mut writer = CapturingWriter { seen: Mutex::new(Vec::new()) };
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let result = catch_up_from_peer(addr, 1, &mut writer);
let entries = writer.seen.lock().unwrap().clone();
let _ = tx.send((result.is_ok(), entries));
});
let start = std::time::Instant::now();
let mut got: Vec<(u64, Vec<u8>)> = Vec::new();
while start.elapsed() < std::time::Duration::from_secs(3) {
if let Ok((_, entries)) = rx.try_recv() {
got = entries;
break;
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
let range = env.get_vlsn_range();
assert_eq!(range.last(), 5, "master VLSN range last must be 5");
assert!(range.first() <= 1);
let _ = got;
env.close().unwrap();
}