Skip to main content

noxu_rep/
test_harness.rs

1//! In-memory test harness for replication group testing.
2//!
3//! This module provides Rust analogs of the JE `RepTestBase` /
4//! `RepEnvInfo` (`com.sleepycat.je.rep.impl.RepTestBase` and
5//! `com.sleepycat.je.rep.utilint.RepTestUtils`) classes, used by
6//! the JE replication TCK to bring up multi-node groups in-process,
7//! drive lifecycle transitions, replicate VLSN entries, and assert
8//! invariants without depending on a real network.
9//!
10//! # Design philosophy
11//!
12//! noxu-rep's [`ReplicatedEnvironment`] is already drivable purely
13//! in-process: `become_master`, `become_replica`, `register_vlsn`,
14//! and `apply_entry` operate on the local node's state machine
15//! without requiring any TCP wiring (the TCP receive loop in
16//! `become_replica` is only spawned when an `EnvironmentImpl` has
17//! been attached via `with_environment`).  This harness builds on
18//! that property to provide a JE-style group abstraction that:
19//!
20//! * **Never opens TCP sockets.**  All "replication" between nodes
21//!   is driven by the harness calling the appropriate method on each
22//!   node directly.  This is the moral equivalent of running the
23//!   group with an in-memory [`crate::net::LocalChannel`] transport,
24//!   but without the protocol overhead — perfect for testing
25//!   higher-level invariants (commit ordering, failover, group
26//!   membership).
27//! * **Avoids hangs.**  Tests that use this harness cannot hang on
28//!   real network coordination because there is no real network.
29//!   Every operation is bounded.
30//! * **Stays close to JE TCK shape.**  Method names mirror JE's
31//!   `RepEnvInfo` / `RepTestBase` so port translations are
32//!   mechanical: `openEnv` → [`RepEnvInfo::open_env`], `closeEnv`
33//!   → [`RepEnvInfo::close_env`], `createGroup` →
34//!   [`RepTestBase::create_group`], `findMaster` →
35//!   [`RepTestBase::find_master`], `populateDB` →
36//!   [`RepTestBase::populate_db`], etc.
37//!
38//! Tests that exercise the real network protocol layer should
39//! continue to use `cluster_integration_test.rs`-style
40//! [`crate::net::TcpChannel`] / [`crate::net::TcpChannelListener`]
41//! setups.  This harness is for the layer above.
42//!
43//! # Quick start
44//!
45//! ```no_run
46//! # #[cfg(any(test, feature = "test-harness"))]
47//! # fn demo() {
48//! use noxu_rep::test_harness::RepTestBase;
49//!
50//! // Spin up a 3-node group, elect node 0 as master, replicate
51//! // 100 entries, and assert all replicas applied them.
52//! let mut group = RepTestBase::builder("demo_group").group_size(3).build();
53//! group.create_group(/* master_term */ 1).unwrap();
54//! group.populate_db(0, 100).unwrap();
55//! group.assert_all_at_vlsn(100);
56//! group.shutdown_all();
57//! # }
58//! ```
59
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU16, Ordering};
62use std::time::{Duration, Instant};
63
64use crate::error::{RepError, Result};
65use crate::node_state::NodeState;
66use crate::node_type::NodeType;
67use crate::quorum_policy::QuorumPolicy;
68use crate::rep_config::RepConfig;
69use crate::replicated_environment::ReplicatedEnvironment;
70use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
71
72// ---------------------------------------------------------------------------
73// Port allocation
74// ---------------------------------------------------------------------------
75
76/// Process-wide monotonic port counter used to give each harness group a
77/// disjoint port range.  noxu-rep's in-process state-machine harness does
78/// not actually open these ports, but `RepConfig` requires a port to be
79/// set, and giving each test its own range keeps any future TCP-using
80/// harness extension forward-compatible.
81static NEXT_BASE_PORT: AtomicU16 = AtomicU16::new(40_000);
82
83fn alloc_base_port(group_size: usize) -> u16 {
84    // Reserve `group_size + 16` ports per group to leave headroom for
85    // mid-test add_peer expansions, monitors, etc.
86    let span = (group_size as u16).saturating_add(16);
87    let mut current = NEXT_BASE_PORT.load(Ordering::SeqCst);
88    loop {
89        let next = current.saturating_add(span);
90        // Wrap around at 60_000 to stay clear of ephemeral port range.
91        let next = if next >= 60_000 { 40_000 + span } else { next };
92        match NEXT_BASE_PORT.compare_exchange(
93            current,
94            next,
95            Ordering::SeqCst,
96            Ordering::SeqCst,
97        ) {
98            Ok(_) => return current,
99            Err(actual) => current = actual,
100        }
101    }
102}
103
104// ---------------------------------------------------------------------------
105// RepEnvInfo
106// ---------------------------------------------------------------------------
107
108/// Per-node information held by [`RepTestBase`].
109///
110/// Mirrors JE's `RepTestUtils.RepEnvInfo` — owns one node's
111/// configuration and (optionally) its [`ReplicatedEnvironment`]
112/// once `open_env` has been called.
113///
114/// Cloning a `RepEnvInfo` shares the underlying `Arc<ReplicatedEnvironment>`
115/// so the harness can hand out cheap references without giving up ownership.
116pub struct RepEnvInfo {
117    config: RepConfig,
118    /// Node id (1-based, matching JE convention).
119    node_id: u32,
120    /// `None` until `open_env` is called.
121    env: Option<Arc<ReplicatedEnvironment>>,
122}
123
124impl RepEnvInfo {
125    /// Construct a `RepEnvInfo` with a configuration but no open environment.
126    /// Mirrors `new RepEnvInfo(envHome, repConfig, envConfig)` in JE.
127    pub fn new(config: RepConfig, node_id: u32) -> Self {
128        Self { config, node_id, env: None }
129    }
130
131    /// Open the [`ReplicatedEnvironment`] for this node.  After `open_env`
132    /// the node is in [`NodeState::Detached`] (as just-opened) until a
133    /// `become_master` / `become_replica` call drives a transition.
134    ///
135    /// Mirrors JE's `RepEnvInfo.openEnv`.
136    pub fn open_env(&mut self) -> Result<Arc<ReplicatedEnvironment>> {
137        if self.env.is_some() {
138            return Err(RepError::StateError(
139                "rep env already exists".to_string(),
140            ));
141        }
142        let env = Arc::new(ReplicatedEnvironment::new(self.config.clone())?);
143        env.init_self_weak();
144        self.env = Some(Arc::clone(&env));
145        Ok(env)
146    }
147
148    /// Close the environment and drop our handle.  After `close_env`,
149    /// `open_env` may be called again to simulate a node restart.
150    ///
151    /// Mirrors JE's `RepEnvInfo.closeEnv`.
152    pub fn close_env(&mut self) -> Result<()> {
153        if let Some(env) = self.env.take() {
154            env.close()?;
155        }
156        Ok(())
157    }
158
159    /// Drop the env handle without calling `close()` — simulates a crash.
160    /// Subsequent `open_env` will create a fresh node.
161    ///
162    /// Mirrors JE's `RepEnvInfo.abnormalCloseEnv`.
163    pub fn abnormal_close_env(&mut self) {
164        let _ = self.env.take();
165    }
166
167    /// Returns the open env handle, panicking if `open_env` has not been
168    /// called.  Use [`RepEnvInfo::env`] for a fallible accessor.
169    pub fn get_env(&self) -> Arc<ReplicatedEnvironment> {
170        self.env.as_ref().expect("open_env not called yet").clone()
171    }
172
173    /// Returns the open env handle, or `None` if not yet opened.
174    pub fn env(&self) -> Option<&Arc<ReplicatedEnvironment>> {
175        self.env.as_ref()
176    }
177
178    /// Returns the [`RepConfig`] for this node.
179    pub fn rep_config(&self) -> &RepConfig {
180        &self.config
181    }
182
183    /// Returns the node name (`config.node_name`).
184    pub fn node_name(&self) -> &str {
185        &self.config.node_name
186    }
187
188    /// Returns the 1-based node id.
189    pub fn node_id(&self) -> u32 {
190        self.node_id
191    }
192
193    // ---- State accessors (JE: isMaster / isReplica / isUnknown) ----
194
195    /// Returns `true` iff the node is currently in [`NodeState::Master`].
196    pub fn is_master(&self) -> bool {
197        self.env.as_ref().is_some_and(|e| e.get_state() == NodeState::Master)
198    }
199
200    /// Returns `true` iff the node is currently in [`NodeState::Replica`].
201    pub fn is_replica(&self) -> bool {
202        self.env.as_ref().is_some_and(|e| e.get_state() == NodeState::Replica)
203    }
204
205    /// Returns `true` iff the node is currently in [`NodeState::Unknown`].
206    pub fn is_unknown(&self) -> bool {
207        self.env.as_ref().is_some_and(|e| e.get_state() == NodeState::Unknown)
208    }
209
210    /// Returns the current node state, or `None` if the env is not open.
211    pub fn state(&self) -> Option<NodeState> {
212        self.env.as_ref().map(|e| e.get_state())
213    }
214
215    /// Returns the current VLSN, or `0` if the env is not open.
216    pub fn current_vlsn(&self) -> u64 {
217        self.env.as_ref().map(|e| e.get_current_vlsn()).unwrap_or(0)
218    }
219}
220
221// ---------------------------------------------------------------------------
222// RepTestBase + builder
223// ---------------------------------------------------------------------------
224
225/// JE-style replication group test fixture.
226///
227/// Mirrors JE's `RepTestBase` (`com.sleepycat.je.rep.impl.RepTestBase`).
228/// Encapsulates a group of `N` nodes that share a group name, port range,
229/// and election policy, and provides the lifecycle / replication / assertion
230/// helpers that the JE rep TCK uses.
231///
232/// Use [`RepTestBase::builder`] to construct one; call
233/// [`RepTestBase::create_group`] to bring up all nodes; call
234/// [`RepTestBase::shutdown_all`] (or rely on `Drop`) to tear them down.
235pub struct RepTestBase {
236    group_name: String,
237    nodes: Vec<RepEnvInfo>,
238    /// Cached election term used by [`RepTestBase::create_group`] and
239    /// [`RepTestBase::failover_to`].  Each successful failover increments
240    /// this so that subsequent `become_master` calls observe a strictly
241    /// increasing term.
242    next_term: std::cell::Cell<u64>,
243}
244
245impl RepTestBase {
246    /// Start building a new group with the given group name.
247    pub fn builder(group_name: impl Into<String>) -> RepTestBaseBuilder {
248        RepTestBaseBuilder::new(group_name)
249    }
250
251    /// Number of nodes in the group.
252    pub fn group_size(&self) -> usize {
253        self.nodes.len()
254    }
255
256    /// Borrow node at index `idx` (0-based — JE's `repEnvInfo[i]`).
257    pub fn node(&self, idx: usize) -> &RepEnvInfo {
258        &self.nodes[idx]
259    }
260
261    /// Borrow node at index `idx` mutably.
262    pub fn node_mut(&mut self, idx: usize) -> &mut RepEnvInfo {
263        &mut self.nodes[idx]
264    }
265
266    /// Borrow all nodes.
267    pub fn nodes(&self) -> &[RepEnvInfo] {
268        &self.nodes
269    }
270
271    /// Borrow all nodes mutably.
272    pub fn nodes_mut(&mut self) -> &mut [RepEnvInfo] {
273        &mut self.nodes
274    }
275
276    /// Returns the group name.
277    pub fn group_name(&self) -> &str {
278        &self.group_name
279    }
280
281    // ---- Lifecycle ----
282
283    /// Open every node's env, elect node 0 as master with `term`, and join
284    /// nodes 1..N as replicas pointing at node 0.
285    ///
286    /// Mirrors JE's `RepTestBase.createGroup` (which opens N nodes and
287    /// expects the first to become master, the rest replicas).
288    pub fn create_group(&mut self, term: u64) -> Result<()> {
289        self.create_group_of_size(self.nodes.len(), term)
290    }
291
292    /// Same as [`Self::create_group`] but only brings up the first
293    /// `first_n` nodes — JE's `createGroup(int firstn)` overload.
294    pub fn create_group_of_size(
295        &mut self,
296        first_n: usize,
297        term: u64,
298    ) -> Result<()> {
299        if first_n == 0 || first_n > self.nodes.len() {
300            return Err(RepError::ConfigError(format!(
301                "first_n ({first_n}) must be in 1..={}",
302                self.nodes.len()
303            )));
304        }
305
306        // Open all envs first so each node knows about its peers via the
307        // GroupService / RepGroup state.
308        for node in &mut self.nodes[..first_n] {
309            if node.env.is_none() {
310                node.open_env()?;
311            }
312        }
313
314        // Add every other node as a peer of every node so that
315        // `get_rep_group()` reflects the topology.  This mirrors JE's
316        // helper-host handshake without needing TCP.
317        let peer_specs: Vec<crate::rep_node::RepNode> = self.nodes[..first_n]
318            .iter()
319            .map(|n| {
320                crate::rep_node::RepNode::new(
321                    n.config.node_name.clone(),
322                    n.config.node_type,
323                    n.config.node_host.clone(),
324                    n.config.node_port,
325                    n.node_id,
326                )
327            })
328            .collect();
329
330        for node in &self.nodes[..first_n] {
331            let env = node.get_env();
332            for peer in &peer_specs {
333                if peer.name == node.config.node_name {
334                    continue;
335                }
336                // Best-effort: ignore "already exists" errors.
337                let _ = env.add_peer(peer.clone());
338            }
339        }
340
341        // Elect node 0 as master.
342        self.nodes[0].get_env().become_master(term)?;
343        let master_name = self.nodes[0].config.node_name.clone();
344
345        // Other nodes become replicas pointing at node 0.
346        for node in &self.nodes[1..first_n] {
347            node.get_env().become_replica(&master_name)?;
348        }
349
350        self.next_term.set(term + 1);
351        Ok(())
352    }
353
354    /// Close every node's env (master last, to avoid spurious elections —
355    /// matches JE's `closeNodes`).
356    pub fn shutdown_all(&mut self) {
357        let mut master_idx: Option<usize> = None;
358        for (idx, node) in self.nodes.iter_mut().enumerate() {
359            if node.is_master() {
360                master_idx = Some(idx);
361                continue;
362            }
363            let _ = node.close_env();
364        }
365        if let Some(idx) = master_idx {
366            let _ = self.nodes[idx].close_env();
367        }
368    }
369
370    // ---- Master / replica accessors ----
371
372    /// Find the unique master, or `None` if no node is currently master.
373    /// Mirrors JE's `RepTestBase.findMaster`.
374    pub fn find_master(&self) -> Option<&RepEnvInfo> {
375        self.nodes.iter().find(|n| n.is_master())
376    }
377
378    /// Find the master, or `None` — mutable variant.
379    pub fn find_master_mut(&mut self) -> Option<&mut RepEnvInfo> {
380        self.nodes.iter_mut().find(|n| n.is_master())
381    }
382
383    /// Index of the unique master, or `None`.
384    pub fn find_master_idx(&self) -> Option<usize> {
385        self.nodes.iter().position(|n| n.is_master())
386    }
387
388    /// All replica nodes.
389    pub fn replicas(&self) -> Vec<&RepEnvInfo> {
390        self.nodes.iter().filter(|n| n.is_replica()).collect()
391    }
392
393    /// Wait up to `timeout` for some node to be master, polling at
394    /// `Duration::from_millis(20)` intervals.  Returns the master's
395    /// index on success.  Mirrors JE's `findMasterWait`.
396    pub fn await_master(&self, timeout: Duration) -> Result<usize> {
397        let deadline = Instant::now() + timeout;
398        loop {
399            if let Some(idx) = self.find_master_idx() {
400                return Ok(idx);
401            }
402            if Instant::now() >= deadline {
403                return Err(RepError::StateError(format!(
404                    "timeout: no master after {:?}",
405                    timeout
406                )));
407            }
408            std::thread::sleep(Duration::from_millis(20));
409        }
410    }
411
412    /// Wait up to `timeout` for node `idx` to enter `target` state.
413    pub fn await_state(
414        &self,
415        idx: usize,
416        target: NodeState,
417        timeout: Duration,
418    ) -> Result<()> {
419        let deadline = Instant::now() + timeout;
420        loop {
421            if self.nodes[idx].state() == Some(target) {
422                return Ok(());
423            }
424            if Instant::now() >= deadline {
425                return Err(RepError::StateError(format!(
426                    "timeout: node {} did not reach {:?} after {:?} (current: {:?})",
427                    idx,
428                    target,
429                    timeout,
430                    self.nodes[idx].state(),
431                )));
432            }
433            std::thread::sleep(Duration::from_millis(20));
434        }
435    }
436
437    /// Wait up to `timeout` for node `idx`'s VLSN to reach at least `vlsn`.
438    pub fn await_vlsn_at_least(
439        &self,
440        idx: usize,
441        vlsn: u64,
442        timeout: Duration,
443    ) -> Result<()> {
444        let deadline = Instant::now() + timeout;
445        loop {
446            if self.nodes[idx].current_vlsn() >= vlsn {
447                return Ok(());
448            }
449            if Instant::now() >= deadline {
450                return Err(RepError::StateError(format!(
451                    "timeout: node {} did not reach VLSN {} after {:?} (current: {})",
452                    idx,
453                    vlsn,
454                    timeout,
455                    self.nodes[idx].current_vlsn(),
456                )));
457            }
458            std::thread::sleep(Duration::from_millis(20));
459        }
460    }
461
462    // ---- Replication operations ----
463
464    /// Register a single VLSN on the master and apply it to every other
465    /// node (acting as replicas).  This is the in-process moral equivalent
466    /// of "the master commits the txn, and the feeder streams it".
467    ///
468    /// `entry_type` is the replica-side `apply_entry` discriminator (a `u8`
469    /// that on the JE side selects between LN / commit / abort entries).
470    pub fn replicate_one(
471        &self,
472        vlsn: u64,
473        file: u32,
474        offset: u32,
475        entry_type: u8,
476    ) -> Result<()> {
477        let master_idx = self.find_master_idx().ok_or_else(|| {
478            RepError::StateError("no master to replicate from".to_string())
479        })?;
480        let master = self.nodes[master_idx].get_env();
481        master.register_vlsn(vlsn, file, offset);
482
483        for (i, node) in self.nodes.iter().enumerate() {
484            if i == master_idx || !node.is_replica() {
485                continue;
486            }
487            node.get_env().apply_entry(vlsn, entry_type, vec![0u8; 8])?;
488        }
489        Ok(())
490    }
491
492    /// Replicate `count` VLSN entries starting at `start_vlsn`.  Mirrors
493    /// JE's `populateDB(rep, dbName, start, n)` for the harness layer:
494    /// the master records each VLSN and replicas apply it in order.
495    pub fn populate_db(&self, start_vlsn: u64, count: u64) -> Result<()> {
496        for offset in 0..count {
497            let vlsn = start_vlsn + offset;
498            // entry_type=0 ⇒ generic LN_TRANSACTIONAL marker on the apply
499            // side; the harness does not exercise type-specific logic.
500            self.replicate_one(vlsn, 0, (vlsn as u32).wrapping_mul(16), 0)?;
501        }
502        Ok(())
503    }
504
505    /// Same as [`Self::populate_db`] but only writes to the master and
506    /// leaves replicas in the dust — useful for partition / catch-up tests.
507    pub fn populate_master_only(
508        &self,
509        start_vlsn: u64,
510        count: u64,
511    ) -> Result<()> {
512        let master = self.find_master().ok_or_else(|| {
513            RepError::StateError("no master to populate".to_string())
514        })?;
515        for offset in 0..count {
516            let vlsn = start_vlsn + offset;
517            master.get_env().register_vlsn(
518                vlsn,
519                0,
520                (vlsn as u32).wrapping_mul(16),
521            );
522        }
523        Ok(())
524    }
525
526    /// Replay `start_vlsn..start_vlsn+count` on a single replica — used to
527    /// simulate a replica catching up after a partition.
528    pub fn catch_up_replica(
529        &self,
530        replica_idx: usize,
531        start_vlsn: u64,
532        count: u64,
533    ) -> Result<()> {
534        let env = self.nodes[replica_idx].get_env();
535        for offset in 0..count {
536            let vlsn = start_vlsn + offset;
537            env.apply_entry(vlsn, 0, vec![0u8; 8])?;
538        }
539        Ok(())
540    }
541
542    // ---- Failover ----
543
544    /// Close the current master; mirrors JE's `leaveGroupAllButMaster`'s
545    /// inverse — kill the master, leaving replicas in [`NodeState::Replica`]
546    /// until a [`Self::failover_to`] call drives a new election.
547    ///
548    /// Returns the index of the closed master.
549    pub fn close_master(&mut self) -> Result<usize> {
550        let idx = self.find_master_idx().ok_or_else(|| {
551            RepError::StateError("no master to close".to_string())
552        })?;
553        self.nodes[idx].close_env()?;
554        Ok(idx)
555    }
556
557    /// Drive replica `replica_idx` through `Replica → Unknown → Master`
558    /// using a fresh term, then point all other live replicas at the new
559    /// master.  Mirrors JE's `transferMaster` for the in-process harness.
560    pub fn failover_to(&mut self, replica_idx: usize) -> Result<()> {
561        let term = self.next_term.get();
562        self.next_term.set(term + 1);
563
564        let target_env = self.nodes[replica_idx].get_env();
565        target_env.ensure_unknown_state()?;
566        target_env.become_master(term)?;
567
568        let new_master_name = self.nodes[replica_idx].config.node_name.clone();
569        for (i, node) in self.nodes.iter().enumerate() {
570            if i == replica_idx {
571                continue;
572            }
573            if node.env.is_none() {
574                continue;
575            }
576            // Skip nodes that are already master (shouldn't happen) or
577            // detached / shutdown.
578            let env = node.get_env();
579            let s = env.get_state();
580            if matches!(s, NodeState::Detached | NodeState::Shutdown) {
581                continue;
582            }
583            env.ensure_unknown_state()?;
584            env.become_replica(&new_master_name)?;
585        }
586        Ok(())
587    }
588
589    // ---- Assertions ----
590
591    /// Assert every node currently in [`NodeState::Master`] or
592    /// [`NodeState::Replica`] reports `vlsn` as its `current_vlsn`.
593    /// Panics on mismatch.
594    pub fn assert_all_at_vlsn(&self, vlsn: u64) {
595        for node in &self.nodes {
596            if !(node.is_master() || node.is_replica()) {
597                continue;
598            }
599            assert_eq!(
600                node.current_vlsn(),
601                vlsn,
602                "node {} ({:?}) at unexpected VLSN",
603                node.node_name(),
604                node.state(),
605            );
606        }
607    }
608
609    /// Assert node `idx` is in `state`.
610    pub fn assert_state(&self, idx: usize, state: NodeState) {
611        assert_eq!(
612            self.nodes[idx].state(),
613            Some(state),
614            "node {} ({}) wrong state",
615            idx,
616            self.nodes[idx].node_name(),
617        );
618    }
619}
620
621impl Drop for RepTestBase {
622    fn drop(&mut self) {
623        // Best-effort cleanup if the test forgot to call shutdown_all.
624        self.shutdown_all();
625    }
626}
627
628// ---------------------------------------------------------------------------
629// Builder
630// ---------------------------------------------------------------------------
631
632/// Builder for [`RepTestBase`].  Use [`RepTestBase::builder`] to construct.
633pub struct RepTestBaseBuilder {
634    group_name: String,
635    group_size: usize,
636    base_port: Option<u16>,
637    node_type: NodeType,
638    election_timeout: Option<Duration>,
639    quorum_policy: Option<QuorumPolicy>,
640    name_prefix: Option<String>,
641    /// Override the node type for specific indices (e.g. mark node 4 as
642    /// Secondary in an otherwise-Electable group).
643    node_type_overrides: Vec<(usize, NodeType)>,
644}
645
646impl RepTestBaseBuilder {
647    fn new(group_name: impl Into<String>) -> Self {
648        Self {
649            group_name: group_name.into(),
650            group_size: 3,
651            base_port: None,
652            node_type: NodeType::Electable,
653            election_timeout: None,
654            quorum_policy: None,
655            name_prefix: None,
656            node_type_overrides: Vec::new(),
657        }
658    }
659
660    /// Number of nodes in the group (default: 3).
661    pub fn group_size(mut self, n: usize) -> Self {
662        self.group_size = n;
663        self
664    }
665
666    /// Base port; node `i` will use `base_port + i`.  Default: a process-wide
667    /// monotonically allocated port range that does not overlap other
668    /// concurrently-running harness groups.
669    pub fn base_port(mut self, p: u16) -> Self {
670        self.base_port = Some(p);
671        self
672    }
673
674    /// Default node type for every node (default: [`NodeType::Electable`]).
675    pub fn node_type(mut self, t: NodeType) -> Self {
676        self.node_type = t;
677        self
678    }
679
680    /// Override the node type for a specific index.  May be called multiple
681    /// times; later calls override earlier ones for the same index.
682    pub fn override_node_type(mut self, idx: usize, t: NodeType) -> Self {
683        self.node_type_overrides.push((idx, t));
684        self
685    }
686
687    /// Election timeout passed to [`RepConfig`].
688    pub fn election_timeout(mut self, t: Duration) -> Self {
689        self.election_timeout = Some(t);
690        self
691    }
692
693    /// Quorum policy passed to [`RepConfig`].
694    pub fn quorum_policy(mut self, q: QuorumPolicy) -> Self {
695        self.quorum_policy = Some(q);
696        self
697    }
698
699    /// Per-node name prefix; the i-th node will be named
700    /// `"{prefix}{i+1}"`.  Default: derived from the group name.
701    pub fn name_prefix(mut self, p: impl Into<String>) -> Self {
702        self.name_prefix = Some(p.into());
703        self
704    }
705
706    /// Construct the [`RepTestBase`].  Does NOT open any envs — call
707    /// [`RepTestBase::create_group`] to drive the lifecycle.
708    pub fn build(self) -> RepTestBase {
709        let base_port =
710            self.base_port.unwrap_or_else(|| alloc_base_port(self.group_size));
711        let prefix = self
712            .name_prefix
713            .unwrap_or_else(|| format!("{}_n", self.group_name));
714
715        let mut overrides = std::collections::HashMap::new();
716        for (idx, t) in self.node_type_overrides {
717            overrides.insert(idx, t);
718        }
719
720        let mut nodes = Vec::with_capacity(self.group_size);
721        for i in 0..self.group_size {
722            let node_name = format!("{}{}", prefix, i + 1);
723            let node_type = *overrides.get(&i).unwrap_or(&self.node_type);
724            let port = base_port + i as u16;
725
726            let mut b =
727                RepConfig::builder(&self.group_name, &node_name, "127.0.0.1")
728                    .node_port(port)
729                    .node_type(node_type);
730            if let Some(t) = self.election_timeout {
731                b = b.election_timeout(t);
732            }
733            if let Some(q) = self.quorum_policy.clone() {
734                b = b.quorum_policy(q);
735            }
736            let config = b.build();
737            nodes.push(RepEnvInfo::new(config, (i + 1) as u32));
738        }
739
740        RepTestBase {
741            group_name: self.group_name,
742            nodes,
743            next_term: std::cell::Cell::new(1),
744        }
745    }
746}
747
748// ---------------------------------------------------------------------------
749// Listener helpers
750// ---------------------------------------------------------------------------
751
752/// `StateChangeListener` that counts master / replica / unknown / detached
753/// / shutdown transitions.  Mirrors JE's `MasterListener` and friends but
754/// generalized — every test that wanted a "wait for master became X"
755/// listener can read the relevant counter.
756#[derive(Default)]
757pub struct CountingListener {
758    pub master: std::sync::atomic::AtomicUsize,
759    pub replica: std::sync::atomic::AtomicUsize,
760    pub unknown: std::sync::atomic::AtomicUsize,
761    pub detached: std::sync::atomic::AtomicUsize,
762    pub shutdown: std::sync::atomic::AtomicUsize,
763}
764
765impl CountingListener {
766    pub fn new() -> Arc<Self> {
767        Arc::new(Self::default())
768    }
769
770    pub fn master_count(&self) -> usize {
771        self.master.load(Ordering::SeqCst)
772    }
773    pub fn replica_count(&self) -> usize {
774        self.replica.load(Ordering::SeqCst)
775    }
776    pub fn unknown_count(&self) -> usize {
777        self.unknown.load(Ordering::SeqCst)
778    }
779    pub fn detached_count(&self) -> usize {
780        self.detached.load(Ordering::SeqCst)
781    }
782    pub fn shutdown_count(&self) -> usize {
783        self.shutdown.load(Ordering::SeqCst)
784    }
785}
786
787impl StateChangeListener for CountingListener {
788    fn on_state_change(&self, ev: StateChangeEvent) {
789        let counter = match ev.new_state {
790            NodeState::Master => &self.master,
791            NodeState::Replica => &self.replica,
792            NodeState::Unknown => &self.unknown,
793            NodeState::Detached => &self.detached,
794            NodeState::Shutdown => &self.shutdown,
795        };
796        counter.fetch_add(1, Ordering::SeqCst);
797    }
798}
799
800// ---------------------------------------------------------------------------
801// Tests for the harness itself
802// ---------------------------------------------------------------------------
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807
808    #[test]
809    fn builder_produces_n_nodes_with_disjoint_names() {
810        let group = RepTestBase::builder("hs1").group_size(4).build();
811        assert_eq!(group.group_size(), 4);
812        let names: Vec<&str> =
813            group.nodes().iter().map(|n| n.node_name()).collect();
814        assert_eq!(names, vec!["hs1_n1", "hs1_n2", "hs1_n3", "hs1_n4"]);
815        // Ports are monotonically increasing.
816        let ports: Vec<u16> =
817            group.nodes().iter().map(|n| n.rep_config().node_port).collect();
818        for w in ports.windows(2) {
819            assert!(w[1] == w[0] + 1, "ports must be consecutive: {:?}", ports);
820        }
821    }
822
823    #[test]
824    fn create_group_elects_master_and_replicas() {
825        let mut group = RepTestBase::builder("hs2").group_size(3).build();
826        group.create_group(1).unwrap();
827
828        assert!(group.nodes()[0].is_master(), "node 0 must be master");
829        assert!(group.nodes()[1].is_replica(), "node 1 must be replica");
830        assert!(group.nodes()[2].is_replica(), "node 2 must be replica");
831
832        let m = group.find_master().unwrap();
833        assert_eq!(m.node_name(), "hs2_n1");
834    }
835
836    #[test]
837    fn populate_db_advances_all_replicas() {
838        let mut group = RepTestBase::builder("hs3").group_size(3).build();
839        group.create_group(1).unwrap();
840
841        group.populate_db(1, 50).unwrap();
842        group.assert_all_at_vlsn(50);
843    }
844
845    #[test]
846    fn failover_drives_replica_to_master() {
847        let mut group = RepTestBase::builder("hs4").group_size(3).build();
848        group.create_group(1).unwrap();
849
850        // Master writes 10 entries.
851        group.populate_db(1, 10).unwrap();
852        group.assert_all_at_vlsn(10);
853
854        // Master crashes.
855        let old_master = group.close_master().unwrap();
856        assert_eq!(old_master, 0);
857
858        // Failover to node 1 (a former replica).
859        group.failover_to(1).unwrap();
860
861        // Node 1 must be master, node 2 must be its replica.
862        assert!(group.nodes()[1].is_master());
863        assert!(group.nodes()[2].is_replica());
864
865        // VLSN must not regress.
866        assert!(group.nodes()[1].current_vlsn() >= 10);
867    }
868
869    #[test]
870    fn await_master_finds_already_elected_master() {
871        let mut group = RepTestBase::builder("hs5").group_size(3).build();
872        group.create_group(1).unwrap();
873        let idx = group.await_master(Duration::from_millis(200)).unwrap();
874        assert_eq!(idx, 0);
875    }
876
877    #[test]
878    fn await_master_times_out_when_no_master() {
879        let group = RepTestBase::builder("hs6").group_size(3).build();
880        let r = group.await_master(Duration::from_millis(50));
881        assert!(r.is_err(), "must time out");
882    }
883
884    #[test]
885    fn counting_listener_counts_transitions() {
886        let mut group = RepTestBase::builder("hs7").group_size(2).build();
887        group.create_group(1).unwrap();
888
889        let listener = CountingListener::new();
890        group.nodes()[0]
891            .get_env()
892            .set_state_change_listener(
893                Arc::clone(&listener) as Arc<dyn StateChangeListener>
894            );
895        // Setting a listener fires once with the current state (Master).
896        assert_eq!(listener.master_count(), 1);
897    }
898
899    #[test]
900    fn catch_up_replica_after_partition() {
901        let mut group = RepTestBase::builder("hs8").group_size(2).build();
902        group.create_group(1).unwrap();
903
904        // Phase 1: both in sync at VLSN 5.
905        group.populate_db(1, 5).unwrap();
906        group.assert_all_at_vlsn(5);
907
908        // Phase 2: partition — master writes alone.
909        group.populate_master_only(6, 10).unwrap();
910        assert_eq!(group.nodes()[0].current_vlsn(), 15);
911        assert_eq!(group.nodes()[1].current_vlsn(), 5);
912
913        // Phase 3: replica catches up.
914        group.catch_up_replica(1, 6, 10).unwrap();
915        group.assert_all_at_vlsn(15);
916    }
917}