noxu_rep/test_harness.rs
1//! In-memory test harness for replication group testing.
2//!
3//! This module provides Rust analogs of the JE `RepTestBase` /
4//! `RepEnvInfo` (`com.sleepycat.je.rep.impl.RepTestBase` and
5//! `com.sleepycat.je.rep.utilint.RepTestUtils`) classes, used by
6//! the JE replication TCK to bring up multi-node groups in-process,
7//! drive lifecycle transitions, replicate VLSN entries, and assert
8//! invariants without depending on a real network.
9//!
10//! # Design philosophy
11//!
12//! noxu-rep's [`ReplicatedEnvironment`] is already drivable purely
13//! in-process: `become_master`, `become_replica`, `register_vlsn`,
14//! and `apply_entry` operate on the local node's state machine
15//! without requiring any TCP wiring (the TCP receive loop in
16//! `become_replica` is only spawned when an `EnvironmentImpl` has
17//! been attached via `with_environment`). This harness builds on
18//! that property to provide a JE-style group abstraction that:
19//!
20//! * **Never opens TCP sockets.** All "replication" between nodes
21//! is driven by the harness calling the appropriate method on each
22//! node directly. This is the moral equivalent of running the
23//! group with an in-memory [`crate::net::LocalChannel`] transport,
24//! but without the protocol overhead — perfect for testing
25//! higher-level invariants (commit ordering, failover, group
26//! membership).
27//! * **Avoids hangs.** Tests that use this harness cannot hang on
28//! real network coordination because there is no real network.
29//! Every operation is bounded.
30//! * **Stays close to JE TCK shape.** Method names mirror JE's
31//! `RepEnvInfo` / `RepTestBase` so port translations are
32//! mechanical: `openEnv` → [`RepEnvInfo::open_env`], `closeEnv`
33//! → [`RepEnvInfo::close_env`], `createGroup` →
34//! [`RepTestBase::create_group`], `findMaster` →
35//! [`RepTestBase::find_master`], `populateDB` →
36//! [`RepTestBase::populate_db`], etc.
37//!
38//! Tests that exercise the real network protocol layer should
39//! continue to use `cluster_integration_test.rs`-style
40//! [`crate::net::TcpChannel`] / [`crate::net::TcpChannelListener`]
41//! setups. This harness is for the layer above.
42//!
43//! # Quick start
44//!
45//! ```no_run
46//! # #[cfg(any(test, feature = "test-harness"))]
47//! # fn demo() {
48//! use noxu_rep::test_harness::RepTestBase;
49//!
50//! // Spin up a 3-node group, elect node 0 as master, replicate
51//! // 100 entries, and assert all replicas applied them.
52//! let mut group = RepTestBase::builder("demo_group").group_size(3).build();
53//! group.create_group(/* master_term */ 1).unwrap();
54//! group.populate_db(0, 100).unwrap();
55//! group.assert_all_at_vlsn(100);
56//! group.shutdown_all();
57//! # }
58//! ```
59
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU16, Ordering};
62use std::time::{Duration, Instant};
63
64use crate::error::{RepError, Result};
65use crate::node_state::NodeState;
66use crate::node_type::NodeType;
67use crate::quorum_policy::QuorumPolicy;
68use crate::rep_config::RepConfig;
69use crate::replicated_environment::ReplicatedEnvironment;
70use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
71
72// ---------------------------------------------------------------------------
73// Port allocation
74// ---------------------------------------------------------------------------
75
76/// Process-wide monotonic port counter used to give each harness group a
77/// disjoint port range. noxu-rep's in-process state-machine harness does
78/// not actually open these ports, but `RepConfig` requires a port to be
79/// set, and giving each test its own range keeps any future TCP-using
80/// harness extension forward-compatible.
81static NEXT_BASE_PORT: AtomicU16 = AtomicU16::new(40_000);
82
83fn alloc_base_port(group_size: usize) -> u16 {
84 // Reserve `group_size + 16` ports per group to leave headroom for
85 // mid-test add_peer expansions, monitors, etc.
86 let span = (group_size as u16).saturating_add(16);
87 let mut current = NEXT_BASE_PORT.load(Ordering::SeqCst);
88 loop {
89 let next = current.saturating_add(span);
90 // Wrap around at 60_000 to stay clear of ephemeral port range.
91 let next = if next >= 60_000 { 40_000 + span } else { next };
92 match NEXT_BASE_PORT.compare_exchange(
93 current,
94 next,
95 Ordering::SeqCst,
96 Ordering::SeqCst,
97 ) {
98 Ok(_) => return current,
99 Err(actual) => current = actual,
100 }
101 }
102}
103
104// ---------------------------------------------------------------------------
105// RepEnvInfo
106// ---------------------------------------------------------------------------
107
108/// Per-node information held by [`RepTestBase`].
109///
110/// Mirrors JE's `RepTestUtils.RepEnvInfo` — owns one node's
111/// configuration and (optionally) its [`ReplicatedEnvironment`]
112/// once `open_env` has been called.
113///
114/// Cloning a `RepEnvInfo` shares the underlying `Arc<ReplicatedEnvironment>`
115/// so the harness can hand out cheap references without giving up ownership.
116pub struct RepEnvInfo {
117 config: RepConfig,
118 /// Node id (1-based, matching JE convention).
119 node_id: u32,
120 /// `None` until `open_env` is called.
121 env: Option<Arc<ReplicatedEnvironment>>,
122}
123
124impl RepEnvInfo {
125 /// Construct a `RepEnvInfo` with a configuration but no open environment.
126 /// Mirrors `new RepEnvInfo(envHome, repConfig, envConfig)` in JE.
127 pub fn new(config: RepConfig, node_id: u32) -> Self {
128 Self { config, node_id, env: None }
129 }
130
131 /// Open the [`ReplicatedEnvironment`] for this node. After `open_env`
132 /// the node is in [`NodeState::Detached`] (as just-opened) until a
133 /// `become_master` / `become_replica` call drives a transition.
134 ///
135 /// Mirrors JE's `RepEnvInfo.openEnv`.
136 pub fn open_env(&mut self) -> Result<Arc<ReplicatedEnvironment>> {
137 if self.env.is_some() {
138 return Err(RepError::StateError(
139 "rep env already exists".to_string(),
140 ));
141 }
142 let env = Arc::new(ReplicatedEnvironment::new(self.config.clone())?);
143 env.init_self_weak();
144 self.env = Some(Arc::clone(&env));
145 Ok(env)
146 }
147
148 /// Close the environment and drop our handle. After `close_env`,
149 /// `open_env` may be called again to simulate a node restart.
150 ///
151 /// Mirrors JE's `RepEnvInfo.closeEnv`.
152 pub fn close_env(&mut self) -> Result<()> {
153 if let Some(env) = self.env.take() {
154 env.close()?;
155 }
156 Ok(())
157 }
158
159 /// Drop the env handle without calling `close()` — simulates a crash.
160 /// Subsequent `open_env` will create a fresh node.
161 ///
162 /// Mirrors JE's `RepEnvInfo.abnormalCloseEnv`.
163 pub fn abnormal_close_env(&mut self) {
164 let _ = self.env.take();
165 }
166
167 /// Returns the open env handle, panicking if `open_env` has not been
168 /// called. Use [`RepEnvInfo::env`] for a fallible accessor.
169 pub fn get_env(&self) -> Arc<ReplicatedEnvironment> {
170 self.env.as_ref().expect("open_env not called yet").clone()
171 }
172
173 /// Returns the open env handle, or `None` if not yet opened.
174 pub fn env(&self) -> Option<&Arc<ReplicatedEnvironment>> {
175 self.env.as_ref()
176 }
177
178 /// Returns the [`RepConfig`] for this node.
179 pub fn rep_config(&self) -> &RepConfig {
180 &self.config
181 }
182
183 /// Returns the node name (`config.node_name`).
184 pub fn node_name(&self) -> &str {
185 &self.config.node_name
186 }
187
188 /// Returns the 1-based node id.
189 pub fn node_id(&self) -> u32 {
190 self.node_id
191 }
192
193 // ---- State accessors (JE: isMaster / isReplica / isUnknown) ----
194
195 /// Returns `true` iff the node is currently in [`NodeState::Master`].
196 pub fn is_master(&self) -> bool {
197 self.env.as_ref().is_some_and(|e| e.get_state() == NodeState::Master)
198 }
199
200 /// Returns `true` iff the node is currently in [`NodeState::Replica`].
201 pub fn is_replica(&self) -> bool {
202 self.env.as_ref().is_some_and(|e| e.get_state() == NodeState::Replica)
203 }
204
205 /// Returns `true` iff the node is currently in [`NodeState::Unknown`].
206 pub fn is_unknown(&self) -> bool {
207 self.env.as_ref().is_some_and(|e| e.get_state() == NodeState::Unknown)
208 }
209
210 /// Returns the current node state, or `None` if the env is not open.
211 pub fn state(&self) -> Option<NodeState> {
212 self.env.as_ref().map(|e| e.get_state())
213 }
214
215 /// Returns the current VLSN, or `0` if the env is not open.
216 pub fn current_vlsn(&self) -> u64 {
217 self.env.as_ref().map(|e| e.get_current_vlsn()).unwrap_or(0)
218 }
219}
220
221// ---------------------------------------------------------------------------
222// RepTestBase + builder
223// ---------------------------------------------------------------------------
224
225/// JE-style replication group test fixture.
226///
227/// Mirrors JE's `RepTestBase` (`com.sleepycat.je.rep.impl.RepTestBase`).
228/// Encapsulates a group of `N` nodes that share a group name, port range,
229/// and election policy, and provides the lifecycle / replication / assertion
230/// helpers that the JE rep TCK uses.
231///
232/// Use [`RepTestBase::builder`] to construct one; call
233/// [`RepTestBase::create_group`] to bring up all nodes; call
234/// [`RepTestBase::shutdown_all`] (or rely on `Drop`) to tear them down.
235pub struct RepTestBase {
236 group_name: String,
237 nodes: Vec<RepEnvInfo>,
238 /// Cached election term used by [`RepTestBase::create_group`] and
239 /// [`RepTestBase::failover_to`]. Each successful failover increments
240 /// this so that subsequent `become_master` calls observe a strictly
241 /// increasing term.
242 next_term: std::cell::Cell<u64>,
243}
244
245impl RepTestBase {
246 /// Start building a new group with the given group name.
247 pub fn builder(group_name: impl Into<String>) -> RepTestBaseBuilder {
248 RepTestBaseBuilder::new(group_name)
249 }
250
251 /// Number of nodes in the group.
252 pub fn group_size(&self) -> usize {
253 self.nodes.len()
254 }
255
256 /// Borrow node at index `idx` (0-based — JE's `repEnvInfo[i]`).
257 pub fn node(&self, idx: usize) -> &RepEnvInfo {
258 &self.nodes[idx]
259 }
260
261 /// Borrow node at index `idx` mutably.
262 pub fn node_mut(&mut self, idx: usize) -> &mut RepEnvInfo {
263 &mut self.nodes[idx]
264 }
265
266 /// Borrow all nodes.
267 pub fn nodes(&self) -> &[RepEnvInfo] {
268 &self.nodes
269 }
270
271 /// Borrow all nodes mutably.
272 pub fn nodes_mut(&mut self) -> &mut [RepEnvInfo] {
273 &mut self.nodes
274 }
275
276 /// Returns the group name.
277 pub fn group_name(&self) -> &str {
278 &self.group_name
279 }
280
281 // ---- Lifecycle ----
282
283 /// Open every node's env, elect node 0 as master with `term`, and join
284 /// nodes 1..N as replicas pointing at node 0.
285 ///
286 /// Mirrors JE's `RepTestBase.createGroup` (which opens N nodes and
287 /// expects the first to become master, the rest replicas).
288 pub fn create_group(&mut self, term: u64) -> Result<()> {
289 self.create_group_of_size(self.nodes.len(), term)
290 }
291
292 /// Same as [`Self::create_group`] but only brings up the first
293 /// `first_n` nodes — JE's `createGroup(int firstn)` overload.
294 pub fn create_group_of_size(
295 &mut self,
296 first_n: usize,
297 term: u64,
298 ) -> Result<()> {
299 if first_n == 0 || first_n > self.nodes.len() {
300 return Err(RepError::ConfigError(format!(
301 "first_n ({first_n}) must be in 1..={}",
302 self.nodes.len()
303 )));
304 }
305
306 // Open all envs first so each node knows about its peers via the
307 // GroupService / RepGroup state.
308 for node in &mut self.nodes[..first_n] {
309 if node.env.is_none() {
310 node.open_env()?;
311 }
312 }
313
314 // Add every other node as a peer of every node so that
315 // `get_rep_group()` reflects the topology. This mirrors JE's
316 // helper-host handshake without needing TCP.
317 let peer_specs: Vec<crate::rep_node::RepNode> = self.nodes[..first_n]
318 .iter()
319 .map(|n| {
320 crate::rep_node::RepNode::new(
321 n.config.node_name.clone(),
322 n.config.node_type,
323 n.config.node_host.clone(),
324 n.config.node_port,
325 n.node_id,
326 )
327 })
328 .collect();
329
330 for node in &self.nodes[..first_n] {
331 let env = node.get_env();
332 for peer in &peer_specs {
333 if peer.name == node.config.node_name {
334 continue;
335 }
336 // Best-effort: ignore "already exists" errors.
337 let _ = env.add_peer(peer.clone());
338 }
339 }
340
341 // Elect node 0 as master.
342 self.nodes[0].get_env().become_master(term)?;
343 let master_name = self.nodes[0].config.node_name.clone();
344
345 // Other nodes become replicas pointing at node 0.
346 for node in &self.nodes[1..first_n] {
347 node.get_env().become_replica(&master_name)?;
348 }
349
350 self.next_term.set(term + 1);
351 Ok(())
352 }
353
354 /// Close every node's env (master last, to avoid spurious elections —
355 /// matches JE's `closeNodes`).
356 pub fn shutdown_all(&mut self) {
357 let mut master_idx: Option<usize> = None;
358 for (idx, node) in self.nodes.iter_mut().enumerate() {
359 if node.is_master() {
360 master_idx = Some(idx);
361 continue;
362 }
363 let _ = node.close_env();
364 }
365 if let Some(idx) = master_idx {
366 let _ = self.nodes[idx].close_env();
367 }
368 }
369
370 // ---- Master / replica accessors ----
371
372 /// Find the unique master, or `None` if no node is currently master.
373 /// Mirrors JE's `RepTestBase.findMaster`.
374 pub fn find_master(&self) -> Option<&RepEnvInfo> {
375 self.nodes.iter().find(|n| n.is_master())
376 }
377
378 /// Find the master, or `None` — mutable variant.
379 pub fn find_master_mut(&mut self) -> Option<&mut RepEnvInfo> {
380 self.nodes.iter_mut().find(|n| n.is_master())
381 }
382
383 /// Index of the unique master, or `None`.
384 pub fn find_master_idx(&self) -> Option<usize> {
385 self.nodes.iter().position(|n| n.is_master())
386 }
387
388 /// All replica nodes.
389 pub fn replicas(&self) -> Vec<&RepEnvInfo> {
390 self.nodes.iter().filter(|n| n.is_replica()).collect()
391 }
392
393 /// Wait up to `timeout` for some node to be master, polling at
394 /// `Duration::from_millis(20)` intervals. Returns the master's
395 /// index on success. Mirrors JE's `findMasterWait`.
396 pub fn await_master(&self, timeout: Duration) -> Result<usize> {
397 let deadline = Instant::now() + timeout;
398 loop {
399 if let Some(idx) = self.find_master_idx() {
400 return Ok(idx);
401 }
402 if Instant::now() >= deadline {
403 return Err(RepError::StateError(format!(
404 "timeout: no master after {:?}",
405 timeout
406 )));
407 }
408 std::thread::sleep(Duration::from_millis(20));
409 }
410 }
411
412 /// Wait up to `timeout` for node `idx` to enter `target` state.
413 pub fn await_state(
414 &self,
415 idx: usize,
416 target: NodeState,
417 timeout: Duration,
418 ) -> Result<()> {
419 let deadline = Instant::now() + timeout;
420 loop {
421 if self.nodes[idx].state() == Some(target) {
422 return Ok(());
423 }
424 if Instant::now() >= deadline {
425 return Err(RepError::StateError(format!(
426 "timeout: node {} did not reach {:?} after {:?} (current: {:?})",
427 idx,
428 target,
429 timeout,
430 self.nodes[idx].state(),
431 )));
432 }
433 std::thread::sleep(Duration::from_millis(20));
434 }
435 }
436
437 /// Wait up to `timeout` for node `idx`'s VLSN to reach at least `vlsn`.
438 pub fn await_vlsn_at_least(
439 &self,
440 idx: usize,
441 vlsn: u64,
442 timeout: Duration,
443 ) -> Result<()> {
444 let deadline = Instant::now() + timeout;
445 loop {
446 if self.nodes[idx].current_vlsn() >= vlsn {
447 return Ok(());
448 }
449 if Instant::now() >= deadline {
450 return Err(RepError::StateError(format!(
451 "timeout: node {} did not reach VLSN {} after {:?} (current: {})",
452 idx,
453 vlsn,
454 timeout,
455 self.nodes[idx].current_vlsn(),
456 )));
457 }
458 std::thread::sleep(Duration::from_millis(20));
459 }
460 }
461
462 // ---- Replication operations ----
463
464 /// Register a single VLSN on the master and apply it to every other
465 /// node (acting as replicas). This is the in-process moral equivalent
466 /// of "the master commits the txn, and the feeder streams it".
467 ///
468 /// `entry_type` is the replica-side `apply_entry` discriminator (a `u8`
469 /// that on the JE side selects between LN / commit / abort entries).
470 pub fn replicate_one(
471 &self,
472 vlsn: u64,
473 file: u32,
474 offset: u32,
475 entry_type: u8,
476 ) -> Result<()> {
477 let master_idx = self.find_master_idx().ok_or_else(|| {
478 RepError::StateError("no master to replicate from".to_string())
479 })?;
480 let master = self.nodes[master_idx].get_env();
481 master.register_vlsn(vlsn, file, offset);
482
483 for (i, node) in self.nodes.iter().enumerate() {
484 if i == master_idx || !node.is_replica() {
485 continue;
486 }
487 node.get_env().apply_entry(vlsn, entry_type, vec![0u8; 8])?;
488 }
489 Ok(())
490 }
491
492 /// Replicate `count` VLSN entries starting at `start_vlsn`. Mirrors
493 /// JE's `populateDB(rep, dbName, start, n)` for the harness layer:
494 /// the master records each VLSN and replicas apply it in order.
495 pub fn populate_db(&self, start_vlsn: u64, count: u64) -> Result<()> {
496 for offset in 0..count {
497 let vlsn = start_vlsn + offset;
498 // entry_type=0 ⇒ generic LN_TRANSACTIONAL marker on the apply
499 // side; the harness does not exercise type-specific logic.
500 self.replicate_one(vlsn, 0, (vlsn as u32).wrapping_mul(16), 0)?;
501 }
502 Ok(())
503 }
504
505 /// Same as [`Self::populate_db`] but only writes to the master and
506 /// leaves replicas in the dust — useful for partition / catch-up tests.
507 pub fn populate_master_only(
508 &self,
509 start_vlsn: u64,
510 count: u64,
511 ) -> Result<()> {
512 let master = self.find_master().ok_or_else(|| {
513 RepError::StateError("no master to populate".to_string())
514 })?;
515 for offset in 0..count {
516 let vlsn = start_vlsn + offset;
517 master.get_env().register_vlsn(
518 vlsn,
519 0,
520 (vlsn as u32).wrapping_mul(16),
521 );
522 }
523 Ok(())
524 }
525
526 /// Replay `start_vlsn..start_vlsn+count` on a single replica — used to
527 /// simulate a replica catching up after a partition.
528 pub fn catch_up_replica(
529 &self,
530 replica_idx: usize,
531 start_vlsn: u64,
532 count: u64,
533 ) -> Result<()> {
534 let env = self.nodes[replica_idx].get_env();
535 for offset in 0..count {
536 let vlsn = start_vlsn + offset;
537 env.apply_entry(vlsn, 0, vec![0u8; 8])?;
538 }
539 Ok(())
540 }
541
542 // ---- Failover ----
543
544 /// Close the current master; mirrors JE's `leaveGroupAllButMaster`'s
545 /// inverse — kill the master, leaving replicas in [`NodeState::Replica`]
546 /// until a [`Self::failover_to`] call drives a new election.
547 ///
548 /// Returns the index of the closed master.
549 pub fn close_master(&mut self) -> Result<usize> {
550 let idx = self.find_master_idx().ok_or_else(|| {
551 RepError::StateError("no master to close".to_string())
552 })?;
553 self.nodes[idx].close_env()?;
554 Ok(idx)
555 }
556
557 /// Drive replica `replica_idx` through `Replica → Unknown → Master`
558 /// using a fresh term, then point all other live replicas at the new
559 /// master. Mirrors JE's `transferMaster` for the in-process harness.
560 pub fn failover_to(&mut self, replica_idx: usize) -> Result<()> {
561 let term = self.next_term.get();
562 self.next_term.set(term + 1);
563
564 let target_env = self.nodes[replica_idx].get_env();
565 target_env.ensure_unknown_state()?;
566 target_env.become_master(term)?;
567
568 let new_master_name = self.nodes[replica_idx].config.node_name.clone();
569 for (i, node) in self.nodes.iter().enumerate() {
570 if i == replica_idx {
571 continue;
572 }
573 if node.env.is_none() {
574 continue;
575 }
576 // Skip nodes that are already master (shouldn't happen) or
577 // detached / shutdown.
578 let env = node.get_env();
579 let s = env.get_state();
580 if matches!(s, NodeState::Detached | NodeState::Shutdown) {
581 continue;
582 }
583 env.ensure_unknown_state()?;
584 env.become_replica(&new_master_name)?;
585 }
586 Ok(())
587 }
588
589 // ---- Assertions ----
590
591 /// Assert every node currently in [`NodeState::Master`] or
592 /// [`NodeState::Replica`] reports `vlsn` as its `current_vlsn`.
593 /// Panics on mismatch.
594 pub fn assert_all_at_vlsn(&self, vlsn: u64) {
595 for node in &self.nodes {
596 if !(node.is_master() || node.is_replica()) {
597 continue;
598 }
599 assert_eq!(
600 node.current_vlsn(),
601 vlsn,
602 "node {} ({:?}) at unexpected VLSN",
603 node.node_name(),
604 node.state(),
605 );
606 }
607 }
608
609 /// Assert node `idx` is in `state`.
610 pub fn assert_state(&self, idx: usize, state: NodeState) {
611 assert_eq!(
612 self.nodes[idx].state(),
613 Some(state),
614 "node {} ({}) wrong state",
615 idx,
616 self.nodes[idx].node_name(),
617 );
618 }
619}
620
621impl Drop for RepTestBase {
622 fn drop(&mut self) {
623 // Best-effort cleanup if the test forgot to call shutdown_all.
624 self.shutdown_all();
625 }
626}
627
628// ---------------------------------------------------------------------------
629// Builder
630// ---------------------------------------------------------------------------
631
632/// Builder for [`RepTestBase`]. Use [`RepTestBase::builder`] to construct.
633pub struct RepTestBaseBuilder {
634 group_name: String,
635 group_size: usize,
636 base_port: Option<u16>,
637 node_type: NodeType,
638 election_timeout: Option<Duration>,
639 quorum_policy: Option<QuorumPolicy>,
640 name_prefix: Option<String>,
641 /// Override the node type for specific indices (e.g. mark node 4 as
642 /// Secondary in an otherwise-Electable group).
643 node_type_overrides: Vec<(usize, NodeType)>,
644}
645
646impl RepTestBaseBuilder {
647 fn new(group_name: impl Into<String>) -> Self {
648 Self {
649 group_name: group_name.into(),
650 group_size: 3,
651 base_port: None,
652 node_type: NodeType::Electable,
653 election_timeout: None,
654 quorum_policy: None,
655 name_prefix: None,
656 node_type_overrides: Vec::new(),
657 }
658 }
659
660 /// Number of nodes in the group (default: 3).
661 pub fn group_size(mut self, n: usize) -> Self {
662 self.group_size = n;
663 self
664 }
665
666 /// Base port; node `i` will use `base_port + i`. Default: a process-wide
667 /// monotonically allocated port range that does not overlap other
668 /// concurrently-running harness groups.
669 pub fn base_port(mut self, p: u16) -> Self {
670 self.base_port = Some(p);
671 self
672 }
673
674 /// Default node type for every node (default: [`NodeType::Electable`]).
675 pub fn node_type(mut self, t: NodeType) -> Self {
676 self.node_type = t;
677 self
678 }
679
680 /// Override the node type for a specific index. May be called multiple
681 /// times; later calls override earlier ones for the same index.
682 pub fn override_node_type(mut self, idx: usize, t: NodeType) -> Self {
683 self.node_type_overrides.push((idx, t));
684 self
685 }
686
687 /// Election timeout passed to [`RepConfig`].
688 pub fn election_timeout(mut self, t: Duration) -> Self {
689 self.election_timeout = Some(t);
690 self
691 }
692
693 /// Quorum policy passed to [`RepConfig`].
694 pub fn quorum_policy(mut self, q: QuorumPolicy) -> Self {
695 self.quorum_policy = Some(q);
696 self
697 }
698
699 /// Per-node name prefix; the i-th node will be named
700 /// `"{prefix}{i+1}"`. Default: derived from the group name.
701 pub fn name_prefix(mut self, p: impl Into<String>) -> Self {
702 self.name_prefix = Some(p.into());
703 self
704 }
705
706 /// Construct the [`RepTestBase`]. Does NOT open any envs — call
707 /// [`RepTestBase::create_group`] to drive the lifecycle.
708 pub fn build(self) -> RepTestBase {
709 let base_port =
710 self.base_port.unwrap_or_else(|| alloc_base_port(self.group_size));
711 let prefix = self
712 .name_prefix
713 .unwrap_or_else(|| format!("{}_n", self.group_name));
714
715 let mut overrides = std::collections::HashMap::new();
716 for (idx, t) in self.node_type_overrides {
717 overrides.insert(idx, t);
718 }
719
720 let mut nodes = Vec::with_capacity(self.group_size);
721 for i in 0..self.group_size {
722 let node_name = format!("{}{}", prefix, i + 1);
723 let node_type = *overrides.get(&i).unwrap_or(&self.node_type);
724 let port = base_port + i as u16;
725
726 let mut b =
727 RepConfig::builder(&self.group_name, &node_name, "127.0.0.1")
728 .node_port(port)
729 .node_type(node_type);
730 if let Some(t) = self.election_timeout {
731 b = b.election_timeout(t);
732 }
733 if let Some(q) = self.quorum_policy.clone() {
734 b = b.quorum_policy(q);
735 }
736 let config = b.build();
737 nodes.push(RepEnvInfo::new(config, (i + 1) as u32));
738 }
739
740 RepTestBase {
741 group_name: self.group_name,
742 nodes,
743 next_term: std::cell::Cell::new(1),
744 }
745 }
746}
747
748// ---------------------------------------------------------------------------
749// Listener helpers
750// ---------------------------------------------------------------------------
751
752/// `StateChangeListener` that counts master / replica / unknown / detached
753/// / shutdown transitions. Mirrors JE's `MasterListener` and friends but
754/// generalized — every test that wanted a "wait for master became X"
755/// listener can read the relevant counter.
756#[derive(Default)]
757pub struct CountingListener {
758 pub master: std::sync::atomic::AtomicUsize,
759 pub replica: std::sync::atomic::AtomicUsize,
760 pub unknown: std::sync::atomic::AtomicUsize,
761 pub detached: std::sync::atomic::AtomicUsize,
762 pub shutdown: std::sync::atomic::AtomicUsize,
763}
764
765impl CountingListener {
766 pub fn new() -> Arc<Self> {
767 Arc::new(Self::default())
768 }
769
770 pub fn master_count(&self) -> usize {
771 self.master.load(Ordering::SeqCst)
772 }
773 pub fn replica_count(&self) -> usize {
774 self.replica.load(Ordering::SeqCst)
775 }
776 pub fn unknown_count(&self) -> usize {
777 self.unknown.load(Ordering::SeqCst)
778 }
779 pub fn detached_count(&self) -> usize {
780 self.detached.load(Ordering::SeqCst)
781 }
782 pub fn shutdown_count(&self) -> usize {
783 self.shutdown.load(Ordering::SeqCst)
784 }
785}
786
787impl StateChangeListener for CountingListener {
788 fn on_state_change(&self, ev: StateChangeEvent) {
789 let counter = match ev.new_state {
790 NodeState::Master => &self.master,
791 NodeState::Replica => &self.replica,
792 NodeState::Unknown => &self.unknown,
793 NodeState::Detached => &self.detached,
794 NodeState::Shutdown => &self.shutdown,
795 };
796 counter.fetch_add(1, Ordering::SeqCst);
797 }
798}
799
800// ---------------------------------------------------------------------------
801// Tests for the harness itself
802// ---------------------------------------------------------------------------
803
804#[cfg(test)]
805mod tests {
806 use super::*;
807
808 #[test]
809 fn builder_produces_n_nodes_with_disjoint_names() {
810 let group = RepTestBase::builder("hs1").group_size(4).build();
811 assert_eq!(group.group_size(), 4);
812 let names: Vec<&str> =
813 group.nodes().iter().map(|n| n.node_name()).collect();
814 assert_eq!(names, vec!["hs1_n1", "hs1_n2", "hs1_n3", "hs1_n4"]);
815 // Ports are monotonically increasing.
816 let ports: Vec<u16> =
817 group.nodes().iter().map(|n| n.rep_config().node_port).collect();
818 for w in ports.windows(2) {
819 assert!(w[1] == w[0] + 1, "ports must be consecutive: {:?}", ports);
820 }
821 }
822
823 #[test]
824 fn create_group_elects_master_and_replicas() {
825 let mut group = RepTestBase::builder("hs2").group_size(3).build();
826 group.create_group(1).unwrap();
827
828 assert!(group.nodes()[0].is_master(), "node 0 must be master");
829 assert!(group.nodes()[1].is_replica(), "node 1 must be replica");
830 assert!(group.nodes()[2].is_replica(), "node 2 must be replica");
831
832 let m = group.find_master().unwrap();
833 assert_eq!(m.node_name(), "hs2_n1");
834 }
835
836 #[test]
837 fn populate_db_advances_all_replicas() {
838 let mut group = RepTestBase::builder("hs3").group_size(3).build();
839 group.create_group(1).unwrap();
840
841 group.populate_db(1, 50).unwrap();
842 group.assert_all_at_vlsn(50);
843 }
844
845 #[test]
846 fn failover_drives_replica_to_master() {
847 let mut group = RepTestBase::builder("hs4").group_size(3).build();
848 group.create_group(1).unwrap();
849
850 // Master writes 10 entries.
851 group.populate_db(1, 10).unwrap();
852 group.assert_all_at_vlsn(10);
853
854 // Master crashes.
855 let old_master = group.close_master().unwrap();
856 assert_eq!(old_master, 0);
857
858 // Failover to node 1 (a former replica).
859 group.failover_to(1).unwrap();
860
861 // Node 1 must be master, node 2 must be its replica.
862 assert!(group.nodes()[1].is_master());
863 assert!(group.nodes()[2].is_replica());
864
865 // VLSN must not regress.
866 assert!(group.nodes()[1].current_vlsn() >= 10);
867 }
868
869 #[test]
870 fn await_master_finds_already_elected_master() {
871 let mut group = RepTestBase::builder("hs5").group_size(3).build();
872 group.create_group(1).unwrap();
873 let idx = group.await_master(Duration::from_millis(200)).unwrap();
874 assert_eq!(idx, 0);
875 }
876
877 #[test]
878 fn await_master_times_out_when_no_master() {
879 let group = RepTestBase::builder("hs6").group_size(3).build();
880 let r = group.await_master(Duration::from_millis(50));
881 assert!(r.is_err(), "must time out");
882 }
883
884 #[test]
885 fn counting_listener_counts_transitions() {
886 let mut group = RepTestBase::builder("hs7").group_size(2).build();
887 group.create_group(1).unwrap();
888
889 let listener = CountingListener::new();
890 group.nodes()[0]
891 .get_env()
892 .set_state_change_listener(
893 Arc::clone(&listener) as Arc<dyn StateChangeListener>
894 );
895 // Setting a listener fires once with the current state (Master).
896 assert_eq!(listener.master_count(), 1);
897 }
898
899 #[test]
900 fn catch_up_replica_after_partition() {
901 let mut group = RepTestBase::builder("hs8").group_size(2).build();
902 group.create_group(1).unwrap();
903
904 // Phase 1: both in sync at VLSN 5.
905 group.populate_db(1, 5).unwrap();
906 group.assert_all_at_vlsn(5);
907
908 // Phase 2: partition — master writes alone.
909 group.populate_master_only(6, 10).unwrap();
910 assert_eq!(group.nodes()[0].current_vlsn(), 15);
911 assert_eq!(group.nodes()[1].current_vlsn(), 5);
912
913 // Phase 3: replica catches up.
914 group.catch_up_replica(1, 6, 10).unwrap();
915 group.assert_all_at_vlsn(15);
916 }
917}