Skip to main content

noxu_rep/
replicated_environment.rs

1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29    AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30    ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43    ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::{
50    AnyServiceDispatcher, TcpServiceDispatcher,
51};
52use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
53use crate::network_restore_server::{
54    NetworkRestoreServer, RESTORE_SERVICE_NAME,
55};
56use crate::node_state::{NodeState, NodeStateMachine};
57use crate::rep_config::RepConfig;
58use crate::rep_stats::RepStats;
59use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
60use crate::stream::feeder::EnvironmentLogScanner;
61use crate::stream::feeder::Feeder;
62use crate::stream::feeder::FeederRunner;
63use crate::stream::peer_feeder::PeerScannerAdapter;
64use crate::stream::peer_feeder::{
65    PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
66};
67use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
68use crate::stream::syncup::{
69    Matchpoint, RollbackDecision, find_matchpoint, verify_rollback,
70};
71use crate::stream::syncup_reader::VlsnIndexView;
72use crate::vlsn::vlsn_index::VlsnIndex;
73use crate::vlsn::vlsn_range::VlsnRange;
74use std::collections::HashMap;
75
76/// Default heartbeat timeout for master liveness detection.
77const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
78
79/// A replicated database environment.
80///
81///
82///
83/// This is the entry point for replication. It wraps a standard Environment
84/// and adds replication capabilities including master election, replica
85/// streaming, and commit acknowledgments.
86///
87/// High Availability (HA) provides a replicated, embedded database
88/// management system which provides fast, reliable, and scalable data
89/// management. HA enables replication of an environment across a Replication
90/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
91///
92/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
93/// operations are executed in the same fashion in both replicated and
94/// non-replicated applications. A `ReplicatedEnvironment` must be
95/// transactional. All replicated databases created in the replicated
96/// environment must be transactional as well.
97///
98/// A `ReplicatedEnvironment` joins its replication group when it is created.
99/// When `new()` returns, the node will have established contact with the other
100/// members of the group and will be ready to service operations.
101///
102/// Replicated environments can be created with node type Electable or
103/// Secondary. Electable nodes can be masters or replicas, and participate in
104/// both master elections and commit durability decisions. Secondary nodes can
105/// only be replicas, not masters, and do not participate in either elections or
106/// durability decisions.
107///
108/// # Example
109///
110/// ```ignore
111/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
112///
113/// let config = RepConfig::builder("my_group", "node1", "localhost")
114///     .node_port(5001)
115///     .build();
116/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
117/// ```
118/// Outcome of [`ReplicatedEnvironment::syncup_with_feeder`] — the action taken
119/// by a live diverged-tail syncup. Port of the branch in JE
120/// `ReplicaFeederSyncup.execute` between a soft rollback and a network restore.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum SyncupAction {
123    /// The divergent tail was rolled back to the matchpoint; resume streaming
124    /// from `start_vlsn` (`matchpoint + 1`). `matchpoint_vlsn == last VLSN`
125    /// means the replica was not diverged and nothing was truncated.
126    RolledBack { matchpoint_vlsn: u64, start_vlsn: u64 },
127    /// No safe rollback (no common matchpoint, or it would cross a committed
128    /// txn); the replica must do a full network restore.
129    NeedsRestore,
130}
131
132pub struct ReplicatedEnvironment {
133    /// The replication configuration for this node.
134    config: RepConfig,
135    /// Tracks the current node state (Detached, Unknown, Master, Replica).
136    node_state: NodeStateMachine,
137    /// Service for managing the replication group membership.
138    group_service: GroupService,
139    /// Maps VLSNs to log file positions.
140    ///
141    /// Wrapped in `Arc` so that background daemons (election driver,
142    /// VLSN-index persistence flusher) can share access without
143    /// borrowing the env.  Closes finding F11 (
144    /// the 2026 review).
145    vlsn_index: Arc<VlsnIndex>,
146    /// Tracks acknowledgments from replicas (used by master).
147    ack_tracker: AckTracker,
148    /// Replication statistics.
149    stats: RepStats,
150    /// Active feeder threads (master -> replica streams).
151    feeders: RwLock<Vec<Feeder>>,
152    /// Replica stream for receiving updates from the master.
153    replica_stream: ReplicaStream,
154    /// Tracks the current master node.
155    master_tracker: MasterTracker,
156    /// State change listeners.
157    listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
158    /// Shutdown flag.
159    shutdown: AtomicBool,
160    /// Service dispatcher — listens on the replication port and routes
161    /// incoming connections to the appropriate service handler (feeder, etc.).
162    ///
163    /// `Plain`: plain TCP (default / Phase-2 behaviour).
164    /// `Tls`: TLS + mTLS enforcement (Phase 3, when `RepConfig::tls_config` is set
165    /// and `transport_kind` is `Tls`).
166    ///
167    /// `None` only when the bind address cannot be resolved.
168    tcp_dispatcher: Option<AnyServiceDispatcher>,
169    /// The address the `tcp_dispatcher` is actually bound to (may differ from
170    /// the configured port when port 0 is used in tests).
171    bound_addr: Option<SocketAddr>,
172
173    /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
174    ///
175    /// When set, `become_master` spawns a `FeederRunner` per replica using
176    /// `EnvironmentLogScanner`, and `become_replica` spawns a
177    /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
178    ///
179    /// In HA.
180    env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
181
182    /// Background I/O thread handles spawned during state transitions.
183    ///
184    /// Stored so that `close()` can join them cleanly.  Each handle is
185    /// `Option` so we can `take()` it in `close()`.
186    io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
187
188    /// Shutdown flag shared with I/O threads so they terminate when the
189    /// environment is closed.
190    ///
191    /// Wrapped in an `Arc` so the replica receive thread (which connects to
192    /// an upstream feeder via `catch_up_from_peer`) can poll it directly and
193    /// break out of its blocking receive loop on close — otherwise a node
194    /// whose upstream stays connected (e.g. a mid-tier replica in a chain,
195    /// closed before its upstream) would never observe the close and
196    /// `close()`'s thread-join would hang.
197    io_shutdown: Arc<AtomicBool>,
198
199    /// Whether the RESTORE service has been registered on the TCP dispatcher.
200    ///
201    /// When `config.env_home` is `None` at construction time, registration is
202    /// deferred until `with_environment()` provides the env home path.
203    restore_registered: AtomicBool,
204
205    /// In-memory log queue used by the peer feeder service.
206    ///
207    /// When this node is a replica, `apply_entry()` pushes each received log
208    /// entry here.  The `PeerFeederService` registered on the TCP dispatcher
209    /// reads from this queue to stream entries to downstream replicas that
210    /// are behind this node (peer-to-peer log distribution, HA style).
211    peer_scanner: Arc<PeerLogScanner>,
212
213    /// Durable Transaction VLSN (D7, JE RepNode.dtvlsn): the highest VLSN
214    /// known to have been replicated to a *majority* of the electable
215    /// replicas. On a master it is computed from feeder ack/heartbeat progress
216    /// (`update_dtvlsn_from_feeders`); on a replica it is set from commit/abort
217    /// records in the stream (`set_dtvlsn`). It advances monotonically (an
218    /// `update_max`). 0 = NULL_VLSN. Used by the election ranking (D2) so the
219    /// most-durable node, not merely the highest-raw-VLSN node, wins.
220    dtvlsn: std::sync::atomic::AtomicU64,
221
222    /// Shared acceptor state used by the ELECTION service handler.
223    /// The election driver updates `own_vlsn` / `own_term` here as the
224    /// node progresses; incoming acceptor sessions read it on every
225    /// connection so their replies always reflect the local node's
226    /// most recent state.  Closes finding F6.
227    election_state: Arc<ElectionAcceptorState>,
228
229    /// Self-referential `Weak` populated once the env has been wrapped
230    /// in an `Arc`.  Used by the replica I/O thread spawned in
231    /// `become_replica` so it can call `bootstrap_via_dispatcher` when
232    /// the master signals `NeedsRestore`.
233    ///
234    /// Populated lazily via [`Self::init_self_weak`] from `open()` and
235    /// the test harness.  When unset (callers that build the env via
236    /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
237    /// the I/O thread falls back to operator-driven bootstrap.
238    self_weak: OnceLock<Weak<Self>>,
239
240    // -----------------------------------------------------------------------
241    // C-C2: active push-feeder infrastructure
242    // -----------------------------------------------------------------------
243    /// Per-replica channels injected via [`Self::register_feeder_channel`].
244    ///
245    /// When [`Self::become_master`] is called (or when the node is already
246    /// master), a [`FeederRunner`] thread is spawned for each registered
247    /// channel, actively streaming entries to that replica over the channel.
248    ///
249    /// Using `register_feeder_channel` is the primary integration point for
250    /// the push-based feeder path.  Production deployments wire in a
251    /// `TcpChannel`; test code uses `LocalChannelPair`.
252    feeder_channels: StdMutex<HashMap<String, Arc<dyn crate::net::Channel>>>,
253
254    /// Per-replica dedicated entry queues backing the push-feeder path.
255    ///
256    /// Each `FeederRunner` thread reads exclusively from its replica's queue.
257    /// [`Self::replicate_entry`] and [`Self::apply_entry`] fan out into all
258    /// registered queues so the push runners receive entries without competing
259    /// with [`PeerFeederService`] for ownership of `peer_scanner`.
260    feeder_queues: std::sync::RwLock<HashMap<String, Arc<PeerLogScanner>>>,
261
262    /// Active `FeederRunner` references for acked-VLSN queries and
263    /// clean shutdown (M-4: wait for replicas to catch up).
264    active_feeder_runners: StdMutex<HashMap<String, Arc<FeederRunner>>>,
265
266    /// Monotone VLSN counter shared with the wired `EnvironmentImpl`.
267    ///
268    /// Installed into the environment via
269    /// `EnvironmentImpl::set_replication_vlsn_counter()` when
270    /// `with_environment` is called.  Each `log_txn_commit` on the master
271    /// atomically increments this counter and writes a VLSN-tagged WAL entry,
272    /// which `EnvironmentLogScanner` then picks up without any
273    /// `replicate_entry` call from the application.
274    wal_vlsn_counter: Arc<std::sync::atomic::AtomicU64>,
275
276    /// Count of downstream connections this node has served via the JE
277    /// `Feeder`/`MasterFeederSource` mechanism (`FeederRunner +
278    /// EnvironmentLogScanner` reading this node's WAL).  Shared with the
279    /// node's [`crate::stream::peer_feeder::PeerFeederService`] when a WAL
280    /// source is registered (master in `become_master`, or a cascading
281    /// replica in `become_replica`).  A non-zero value PROVES this node fed
282    /// a downstream by the SAME mechanism the master uses — the cascade does
283    /// not diverge.  See [`Self::wal_feeds_served`].
284    wal_feeds_served: Arc<std::sync::atomic::AtomicU64>,
285
286    /// REP-10 (C): the replica-side consistency tracker, built from the
287    /// REP-7 `last_applied_vlsn` handle when the replica replay thread starts
288    /// (`become_replica`).  `None` on a master or before replay is wired.
289    ///
290    /// A read that begins on a replica with a non-`NoConsistency` policy waits
291    /// on this tracker (`begin_read_consistency`).  Port of
292    /// `RepImpl.getConsistency` / `Replica.getConsistencyTracker`.
293    consistency_tracker: StdMutex<Option<crate::ConsistencyTracker>>,
294}
295
296impl ReplicatedEnvironment {
297    /// Create a new replicated environment.
298    ///
299    ///
300    ///
301    /// Creates a replicated environment handle and starts participating in the
302    /// replication group. The node's state is determined when it joins the
303    /// group, and mastership is not preconfigured. If the group has no current
304    /// master, creation will trigger an election to determine whether this node
305    /// will participate as a Master or a Replica.
306    ///
307    /// A brand new node will always join an existing group as a Replica, unless
308    /// it is the very first electable node that is creating the group. In that
309    /// case it joins as the Master of the newly formed singleton group.
310    pub fn new(config: RepConfig) -> Result<Self> {
311        // mTLS Phase 2 (v3.1.0): peer_allowlist enforcement is real at the
312        // TLS channel layer (TlsTcpChannelListener::bind_with_tls_and_allowlist).
313        // Phase 3 (this release): when RepConfig::tls_config is set AND
314        // transport_kind is Tls, the service dispatcher itself enforces mTLS
315        // via TlsTcpServiceDispatcher.  For the remaining cases (no TlsConfig
316        // or non-TLS transport) keep the Phase-2 accurate warn.
317        if !config.peer_allowlist.is_empty() {
318            match config.transport_kind {
319                crate::rep_config::RepTransportKind::Tls => {
320                    if config.tls_config.is_some() {
321                        log::info!(
322                            "[{}] peer_allowlist ({} entries) + tls_config set; \
323                             mTLS will be enforced on the service dispatcher.",
324                            config.node_name,
325                            config.peer_allowlist.len(),
326                        );
327                    } else {
328                        log::info!(
329                            "[{}] peer_allowlist configured ({} entries) but \
330                             tls_config is None — the service dispatcher will \
331                             use plain TCP. Set RepConfig::tls_config to \
332                             activate end-to-end mTLS on this path.",
333                            config.node_name,
334                            config.peer_allowlist.len(),
335                        );
336                    }
337                }
338                _ => {
339                    log::warn!(
340                        "[{}] peer_allowlist is configured ({} entries) but \
341                         transport_kind is not Tls — the allowlist has no \
342                         effect without TLS transport. Set \
343                         RepTransportKind::Tls to activate mTLS enforcement.",
344                        config.node_name,
345                        config.peer_allowlist.len(),
346                    );
347                }
348            }
349        }
350        let node_state = NodeStateMachine::new();
351        let group_service = GroupService::new(config.group_name.clone());
352        let vlsn_index = {
353            // F11: try to load a previously persisted vlsn.idx from
354            // env_home if one exists.  A successfully loaded index lets a
355            // restarted replica resume from where it left off without a
356            // full network restore; a missing or corrupt file falls back
357            // to a fresh in-memory index (caller will need to bootstrap).
358            if let Some(ref home) = config.env_home {
359                match crate::vlsn::persist::load_from_disk(home) {
360                    Ok(Some(idx)) => {
361                        log::info!(
362                            "Node '{}' loaded persisted VLSN index from {} \
363                             ({} entries, latest vlsn={})",
364                            config.node_name,
365                            home.display(),
366                            idx.snapshot_entries().len(),
367                            idx.get_latest_vlsn(),
368                        );
369                        Arc::new(idx)
370                    }
371                    Ok(None) => Arc::new(VlsnIndex::new(10)),
372                    Err(e) => {
373                        log::warn!(
374                            "Node '{}' failed to load persisted VLSN index \
375                             from {}: {} (treating as fresh node — network \
376                             restore required)",
377                            config.node_name,
378                            home.display(),
379                            e
380                        );
381                        // Best-effort: remove the corrupt file so the
382                        // next persist cycle writes a clean one.  A
383                        // missing file is the "fresh node" baseline.
384                        let _ = std::fs::remove_file(
385                            crate::vlsn::persist::index_path(home),
386                        );
387                        Arc::new(VlsnIndex::new(10))
388                    }
389                }
390            } else {
391                Arc::new(VlsnIndex::new(10))
392            }
393        };
394        let ack_tracker = AckTracker::new();
395        let stats = RepStats::new();
396        let feeders = RwLock::new(Vec::new());
397        let replica_stream = ReplicaStream::new();
398        let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
399
400        // Start the service dispatcher.
401        //
402        // Phase 3: when RepConfig::tls_config is set AND transport_kind is Tls,
403        // start a TlsTcpServiceDispatcher (mTLS enforced).  Otherwise fall back
404        // to the plain-TCP TcpServiceDispatcher.
405        let listen_addr_str =
406            format!("{}:{}", config.node_host, config.node_port);
407        let mut restore_registered_init = false;
408
409        // Returns (AnyServiceDispatcher, bound_addr) or (None, None) on error.
410        let (tcp_dispatcher, bound_addr) = match listen_addr_str
411            .parse::<SocketAddr>()
412        {
413            Ok(addr) => {
414                let build_result: Result<(AnyServiceDispatcher, SocketAddr)> =
415                    Self::build_dispatcher(&config, addr);
416                match build_result {
417                    Ok((dispatcher, bound)) => {
418                        // Register the network restore handler.
419                        if let Some(ref home) = config.env_home {
420                            let restore_server =
421                                NetworkRestoreServer::new(home.clone());
422                            dispatcher.register(
423                                RESTORE_SERVICE_NAME,
424                                Arc::new(restore_server),
425                            );
426                            log::debug!(
427                                "Node '{}' RESTORE service registered \
428                                     (env_home={})",
429                                config.node_name,
430                                home.display(),
431                            );
432                            restore_registered_init = true;
433                        }
434                        let kind =
435                            if dispatcher.is_tls() { "TLS" } else { "TCP" };
436                        log::info!(
437                            "Node '{}' {} service dispatcher started on {}",
438                            config.node_name,
439                            kind,
440                            bound
441                        );
442                        (Some(dispatcher), Some(bound))
443                    }
444                    Err(e) => {
445                        log::warn!(
446                            "Node '{}' failed to start service dispatcher \
447                             on {}: {}",
448                            config.node_name,
449                            listen_addr_str,
450                            e
451                        );
452                        (None, None)
453                    }
454                }
455            }
456            Err(e) => {
457                log::warn!(
458                    "Node '{}' cannot parse listen address '{}': {}",
459                    config.node_name,
460                    listen_addr_str,
461                    e
462                );
463                (None, None)
464            }
465        };
466
467        // Build the in-memory peer log scanner; register the peer feeder
468        // service on the dispatcher so downstream replicas can connect.
469        let peer_scanner = Arc::new(PeerLogScanner::new());
470        // F5/F31: build the acceptor state with persistence enabled when
471        // env_home is configured.  Crash-durable promises are required
472        // for the Paxos safety invariant after a process restart.
473        let election_state =
474            Arc::new(if let Some(ref home) = config.env_home {
475                ElectionAcceptorState::with_env_home(
476                    config.node_name.clone(),
477                    1,
478                    home,
479                )
480            } else {
481                ElectionAcceptorState::new(config.node_name.clone(), 1)
482            });
483        if let Some(ref dispatcher) = tcp_dispatcher {
484            let service = PeerFeederService::new(Arc::clone(&peer_scanner));
485            dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
486            log::debug!(
487                "Node '{}' PEER_FEEDER service registered",
488                config.node_name,
489            );
490            // F6: register the ELECTION service so peers can run
491            // run_acceptor against this node when proposing.
492            let election_svc =
493                Arc::new(ElectionService::new(Arc::clone(&election_state)));
494            dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
495            log::debug!(
496                "Node '{}' ELECTION service registered",
497                config.node_name,
498            );
499        }
500
501        let env = Self {
502            config,
503            node_state,
504            group_service,
505            vlsn_index,
506            ack_tracker,
507            stats,
508            feeders,
509            replica_stream,
510            master_tracker,
511            listeners: RwLock::new(Vec::new()),
512            shutdown: AtomicBool::new(false),
513            tcp_dispatcher,
514            bound_addr,
515            env_impl: StdMutex::new(None),
516            io_threads: StdMutex::new(Vec::new()),
517            io_shutdown: Arc::new(AtomicBool::new(false)),
518            restore_registered: AtomicBool::new(restore_registered_init),
519            peer_scanner,
520            dtvlsn: std::sync::atomic::AtomicU64::new(0),
521            election_state,
522            self_weak: OnceLock::new(),
523            feeder_channels: StdMutex::new(HashMap::new()),
524            feeder_queues: std::sync::RwLock::new(HashMap::new()),
525            active_feeder_runners: StdMutex::new(HashMap::new()),
526            wal_vlsn_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
527            wal_feeds_served: Arc::new(std::sync::atomic::AtomicU64::new(0)),
528            consistency_tracker: StdMutex::new(None),
529        };
530
531        Ok(env)
532    }
533
534    /// Open a replicated environment with the standard production
535    /// lifecycle.
536    ///
537    /// This is the entry point recommended by the mdBook chapters:
538    /// it allocates the `ReplicatedEnvironment`, registers all
539    /// services on the TCP dispatcher, and spawns the **election
540    /// driver** background thread that runs Paxos rounds against
541    /// known peers until the node has resolved into either Master or
542    /// Replica state.
543    ///
544    /// Closes finding F6 of the 2026 review.
545    ///
546    /// Use [`ReplicatedEnvironment::new`] directly only when the
547    /// caller plans to drive state transitions explicitly (test
548    /// harnesses, scripted bootstrap, recovery tooling).
549    pub fn open(config: RepConfig) -> Result<Arc<Self>> {
550        let env = Arc::new(Self::new(config)?);
551        env.init_self_weak();
552        env.start_election_driver();
553        env.start_vlsn_persistence_daemon();
554        env.register_admin_service();
555        Ok(env)
556    }
557
558    /// Build the service dispatcher for this node.
559    ///
560    /// Phase 3 logic: when `config.transport_kind == Tls` AND
561    /// `config.tls_config` is `Some`, start a
562    /// [`crate::net::service_dispatcher::TlsTcpServiceDispatcher`] that
563    /// enforces mTLS with the configured `peer_allowlist`.  Otherwise
564    /// start the plain-TCP [`TcpServiceDispatcher`].
565    ///
566    /// Returns `(dispatcher, bound_addr)` or a `RepError` on bind / TLS
567    /// config failure.
568    fn build_dispatcher(
569        #[cfg_attr(not(feature = "tls-rustls"), allow(unused_variables))]
570        config: &RepConfig,
571        addr: SocketAddr,
572    ) -> Result<(AnyServiceDispatcher, SocketAddr)> {
573        #[cfg(feature = "tls-rustls")]
574        if config.transport_kind == crate::rep_config::RepTransportKind::Tls {
575            use crate::auth::PeerAllowlist;
576            use crate::net::service_dispatcher::TlsTcpServiceDispatcher;
577            let tls = config.tls_config.as_ref().ok_or_else(|| {
578                RepError::ConfigError(format!(
579                    "node '{}': transport_kind=Tls requires a tls_config",
580                    config.node_name,
581                ))
582            })?;
583            let allowlist =
584                PeerAllowlist::new(config.peer_allowlist.iter().cloned());
585            // Fail-closed: an empty allowlist with TLS transport is a
586            // misconfiguration. The same policy is enforced at the TLS
587            // listener and QUIC constructors; downgrading to plain TCP here
588            // would be a silent security regression for a node that asked
589            // for TLS.
590            if allowlist.is_empty() {
591                return Err(RepError::ConfigError(format!(
592                    "node '{}': transport_kind=Tls requires a non-empty \
593                     peer_allowlist (mTLS enforcement is fail-closed)",
594                    config.node_name,
595                )));
596            }
597            let disp = TlsTcpServiceDispatcher::new(addr, tls, allowlist)?;
598            let bound = disp.start()?;
599            return Ok((AnyServiceDispatcher::Tls(disp), bound));
600        }
601        // Plain-TCP dispatcher (default or when TLS config is missing).
602        let disp = TcpServiceDispatcher::new(addr).map_err(|e| {
603            RepError::NetworkError(format!("TCP dispatcher init: {e}"))
604        })?;
605        let bound = disp.start()?;
606        Ok((AnyServiceDispatcher::Plain(disp), bound))
607    }
608
609    /// Populate the env's self-referential `Weak` so background
610    /// threads can obtain a back-reference for auto-orchestrated
611    /// follow-up actions (e.g. replica auto-bootstrap on
612    /// `NeedsRestore`).  Idempotent: subsequent calls are silent
613    /// no-ops because the inner [`OnceLock`] only accepts one set.
614    ///
615    /// Callers that wrap the env in `Arc` and want auto-bootstrap
616    /// behaviour should call this immediately after construction.
617    /// `Self::open` already does so.  Test harnesses that drive
618    /// transitions manually (`RepTestBase`) also call this so the
619    /// auto-bootstrap path is exercised in tests.
620    pub fn init_self_weak(self: &Arc<Self>) {
621        let _ = self.self_weak.set(Arc::downgrade(self));
622    }
623
624    /// Register the `ADMIN` service handler on the TCP dispatcher.
625    ///
626    /// Closes findings F7 / F8.  Holds a `Weak<Self>` so the handler
627    /// does not extend the env's lifetime.  Idempotent: re-registering
628    /// is harmless because `TcpServiceDispatcher::register` overwrites
629    /// the existing handler.
630    pub fn register_admin_service(self: &Arc<Self>) {
631        if let Some(ref dispatcher) = self.tcp_dispatcher {
632            crate::group_admin::register_admin_service(
633                dispatcher,
634                Arc::downgrade(self),
635            );
636            log::debug!(
637                "Node '{}' ADMIN service registered",
638                self.config.node_name,
639            );
640        }
641    }
642
643    /// Spawn the VLSN-index persistence daemon (F11).
644    ///
645    /// Periodically (every 2 seconds) snapshots the in-memory
646    /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
647    /// resume from where the replica left off without a full network
648    /// restore.  No-op when `config.env_home` is `None`.
649    ///
650    /// Idempotent: only one daemon is ever spawned per env.
651    pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
652        let Some(home) = self.config.env_home.clone() else {
653            return;
654        };
655        {
656            let threads = self.io_threads.lock().unwrap();
657            if threads.iter().any(|h| {
658                h.thread()
659                    .name()
660                    .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
661            }) {
662                return;
663            }
664        }
665
666        let vlsn_index = Arc::clone(&self.vlsn_index);
667        let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
668        let me = Arc::clone(self);
669        let interval = Duration::from_secs(2);
670
671        let handle = std::thread::Builder::new()
672            .name(name)
673            .spawn(move || {
674                use std::sync::atomic::Ordering;
675                let mut last_persisted_vlsn: u64 = 0;
676                while !me.io_shutdown.load(Ordering::SeqCst)
677                    && !me.is_shutdown()
678                {
679                    std::thread::sleep(interval);
680                    if me.io_shutdown.load(Ordering::SeqCst) {
681                        break;
682                    }
683                    let latest = vlsn_index.get_latest_vlsn();
684                    if latest == last_persisted_vlsn {
685                        // Nothing new to flush.
686                        continue;
687                    }
688                    // X-2: cap the flush at the last durable checkpoint's
689                    // end LSN so the persisted VLSN index never claims
690                    // VLSNs beyond the durable B-tree state.  After a crash
691                    // the recovered tree and the index will be coherent.
692                    let cap_lsn = me
693                        .env_impl
694                        .lock()
695                        .unwrap()
696                        .as_ref()
697                        .and_then(|e| e.get_checkpointer())
698                        .map(|c| c.get_last_checkpoint_end())
699                        .unwrap_or(noxu_util::NULL_LSN);
700                    match crate::vlsn::persist::flush_to_disk_capped(
701                        &vlsn_index,
702                        &home,
703                        cap_lsn,
704                    ) {
705                        Ok(n) => {
706                            log::trace!(
707                                "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
708                                n,
709                                latest,
710                                cap_lsn,
711                            );
712                            last_persisted_vlsn = latest;
713                        }
714                        Err(e) => {
715                            log::warn!(
716                                "vlsn-flush: failed to persist VLSN index to {}: {}",
717                                home.display(),
718                                e
719                            );
720                        }
721                    }
722                }
723                // Final flush on shutdown so a clean close is recoverable.
724                // Cap at the last checkpoint even for the shutdown flush.
725                let cap_lsn = me
726                    .env_impl
727                    .lock()
728                    .unwrap()
729                    .as_ref()
730                    .and_then(|e| e.get_checkpointer())
731                    .map(|c| c.get_last_checkpoint_end())
732                    .unwrap_or(noxu_util::NULL_LSN);
733                if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
734                    &vlsn_index,
735                    &home,
736                    cap_lsn,
737                ) {
738                    log::warn!(
739                        "vlsn-flush (final): failed to persist VLSN index: {}",
740                        e
741                    );
742                }
743            })
744            .expect("failed to spawn noxu-vlsn-flush thread");
745
746        self.io_threads.lock().unwrap().push(handle);
747        log::debug!(
748            "Node '{}' VLSN persistence daemon started",
749            self.config.node_name,
750        );
751    }
752
753    /// Spawn the election driver background thread.
754    ///
755    /// While the env is in `Detached` or `Unknown` state and no master
756    /// is known, the driver periodically attempts a Paxos election
757    /// against peers in `GroupService` (whose ELECTION services were
758    /// registered at `open()` time).  On success the driver calls
759    /// `become_master` (if this node is the winner) or `become_replica`
760    /// (otherwise).  On failure (no quorum), the driver waits
761    /// `config.election_timeout` and tries again.
762    ///
763    /// The driver respects `io_shutdown`; on env close the loop exits
764    /// promptly.
765    ///
766    /// Idempotent: a second call is a no-op (only one driver thread is
767    /// ever spawned per env).
768    pub fn start_election_driver(self: &Arc<Self>) {
769        use std::sync::atomic::Ordering;
770        // Reuse io_shutdown for cancellation; a successful spawn is
771        // recorded by appending to io_threads, so a duplicate call
772        // would re-add a thread — we use a one-shot `AtomicBool`
773        // sentinel placed in the io_shutdown's slot via a new field.
774        // Cheaper: a static name check on io_threads is impossible;
775        // instead, gate spawning on whether any io_thread already
776        // carries the driver name.
777        {
778            let threads = self.io_threads.lock().unwrap();
779            if threads.iter().any(|h| {
780                h.thread()
781                    .name()
782                    .is_some_and(|n| n.starts_with("noxu-election-"))
783            }) {
784                return;
785            }
786        }
787
788        let me = Arc::clone(self);
789        let name = format!("noxu-election-{}", self.config.node_name);
790        let handle = std::thread::Builder::new()
791            .name(name)
792            .spawn(move || {
793                me.run_election_loop();
794            })
795            .expect("failed to spawn election driver thread");
796        self.io_threads.lock().unwrap().push(handle);
797        log::debug!("Node '{}' election driver started", self.config.node_name,);
798        // Keep ordering sane on the io_shutdown flag.
799        let _ = self.io_shutdown.load(Ordering::SeqCst);
800    }
801
802    /// Body of the election driver loop.  Public only for tests; called
803    /// by [`Self::start_election_driver`].
804    fn run_election_loop(self: Arc<Self>) {
805        use std::sync::atomic::Ordering;
806        // Maintain an internal monotonically increasing election term.
807        // Each successful or failed round bumps the term so retries do
808        // not collide with stale acceptor promises.
809        let mut term: u64 = 1;
810
811        loop {
812            if self.io_shutdown.load(Ordering::SeqCst) {
813                return;
814            }
815            if self.is_shutdown() {
816                return;
817            }
818
819            let state = self.node_state.get_state();
820            // Stop driving once we've resolved into Master/Replica;
821            // re-arm only if the node returns to Unknown.
822            if matches!(state, NodeState::Master | NodeState::Replica) {
823                std::thread::sleep(Duration::from_millis(200));
824                continue;
825            }
826            if matches!(state, NodeState::Shutdown) {
827                return;
828            }
829
830            // Probe peers for an active master via the existing
831            // GroupService cache.  In the absence of a heartbeat path
832            // we rely on master_tracker (set by become_replica from
833            // the receive loop).
834            if let Some(master_name) = self.master_tracker.get_master()
835                && master_name != self.config.node_name
836            {
837                let _ = self.become_replica(&master_name);
838                continue;
839            }
840
841            // Snapshot peers to dial for ELECTION.
842            let peers: Vec<(String, SocketAddr)> = self
843                .group_service
844                .get_all_nodes()
845                .into_iter()
846                .filter(|n| n.name != self.config.node_name)
847                .filter_map(|n| {
848                    format!("{}:{}", n.host, n.port)
849                        .parse::<SocketAddr>()
850                        .ok()
851                        .map(|a| (n.name, a))
852                })
853                .collect();
854
855            // Build the local rep group view used by run_election to
856            // compute quorum and resolve the winner name.  Include
857            // self.
858            let group = self.local_rep_group_with_self();
859
860            // Update election state for any concurrent acceptor calls.
861            let our_vlsn = self.vlsn_index.get_latest_vlsn();
862            self.election_state.set_vlsn(our_vlsn);
863            self.election_state.set_term(term);
864            // D2: advertise our DTVLSN as the major election-ranking key.
865            self.election_state.set_dtvlsn(self.get_dtvlsn());
866
867            // Connect to each peer's ELECTION service.  Failures are
868            // tolerated: a peer that doesn't answer simply contributes
869            // no vote.  The election may still reach quorum in the
870            // remaining peers.
871            let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
872                Vec::new();
873            for (peer_name, addr) in &peers {
874                match crate::net::service_dispatcher::connect_to_service(
875                    *addr,
876                    ELECTION_SERVICE_NAME,
877                ) {
878                    Ok(ch) => {
879                        let arc: Arc<dyn crate::net::channel::Channel> =
880                            Arc::new(ch);
881                        channels.push(arc);
882                    }
883                    Err(e) => {
884                        log::trace!(
885                            "election driver: peer {} ({}) unreachable: {}",
886                            peer_name,
887                            addr,
888                            e
889                        );
890                    }
891                }
892            }
893
894            // Resolve our own node_id from the group; if not present
895            // we cannot run an election (closed-world guard — see F22).
896            let self_node_id =
897                group.get_node(&self.config.node_name).map(|n| n.node_id());
898            let self_node_id = match self_node_id {
899                Some(id) => id,
900                None => {
901                    log::warn!(
902                        "election driver: node '{}' not registered in \
903                         own group view; sleeping",
904                        self.config.node_name
905                    );
906                    std::thread::sleep(Duration::from_millis(200));
907                    continue;
908                }
909            };
910
911            log::debug!(
912                "election driver on '{}': starting term={} with {} peers",
913                self.config.node_name,
914                term,
915                channels.len(),
916            );
917            let outcome = crate::elections::paxos::run_election_with_phi_dtvlsn(
918                self_node_id,
919                &self.config.node_name,
920                &group,
921                &channels,
922                our_vlsn,
923                /* priority */ 1,
924                term,
925                /* own_dtvlsn (D2 major ranking key) */
926                self.get_dtvlsn(),
927                None,
928                std::time::Duration::from_millis(500),
929            );
930
931            match outcome {
932                Some(winner_id) if winner_id == self_node_id => {
933                    if let Err(e) = self.become_master(term) {
934                        log::warn!(
935                            "election driver: become_master failed: {}",
936                            e
937                        );
938                    } else {
939                        log::info!(
940                            "election driver: '{}' became master at term {}",
941                            self.config.node_name,
942                            term,
943                        );
944                    }
945                }
946                Some(winner_id) => {
947                    if let Some(winner_node) = group
948                        .get_nodes()
949                        .into_iter()
950                        .find(|n| n.node_id() == winner_id)
951                    {
952                        if let Err(e) = self.become_replica(&winner_node.name) {
953                            log::warn!(
954                                "election driver: become_replica failed: {}",
955                                e
956                            );
957                        } else {
958                            log::info!(
959                                "election driver: '{}' became replica of '{}' at term {}",
960                                self.config.node_name,
961                                winner_node.name,
962                                term,
963                            );
964                        }
965                    }
966                }
967                None => {
968                    log::debug!(
969                        "election driver on '{}' term={}: no quorum",
970                        self.config.node_name,
971                        term,
972                    );
973                }
974            }
975
976            term = term.saturating_add(1);
977            // Back off so we don't pin the loop on transient failures.
978            std::thread::sleep(
979                self.config.election_timeout.min(Duration::from_millis(500)),
980            );
981        }
982    }
983
984    /// Internal: a `RepGroup` snapshot that includes self.
985    fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
986        let mut group = self.get_rep_group();
987        // Ensure self is present in the group view; the
988        // group_service does not auto-register the local node.
989        if group.get_node(&self.config.node_name).is_none() {
990            let mut self_node = crate::rep_node::RepNode::new(
991                self.config.node_name.clone(),
992                self.config.node_type,
993                self.config.node_host.clone(),
994                self.config.node_port,
995                /* node_id */ 0,
996            );
997            // Stable self node_id derived from the name hash so
998            // re-creations in the same process don't collide.
999            use std::hash::{Hash, Hasher};
1000            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1001            self.config.node_name.hash(&mut hasher);
1002            // Restrict to a u32 range and avoid 0 (reserved for
1003            // "unknown").
1004            let id = ((hasher.finish() as u32) | 1).max(1);
1005            self_node.node_id = id;
1006            group.add_node(self_node);
1007        }
1008        group
1009    }
1010
1011    /// Return the socket address the TCP service dispatcher is bound to.
1012    ///
1013    /// This may differ from the configured `node_port` when port 0 is used
1014    /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
1015    /// could not be started (e.g. the address is not resolvable).
1016    pub fn bound_addr(&self) -> Option<SocketAddr> {
1017        self.bound_addr
1018    }
1019
1020    /// Wire a live `EnvironmentImpl` into this replicated environment.
1021    ///
1022    /// After this call, state transitions (`become_master`, `become_replica`)
1023    /// will spawn real feeder/receiver I/O threads backed by the live log.
1024    ///
1025    /// If the RESTORE service was not registered at construction time (because
1026    /// `config.env_home` was `None`), it is registered here using the
1027    /// environment's actual home path.  This mirrors`RepNode.envSetup()`
1028    /// which registers the restore handler during environment wiring.
1029    ///
1030    /// Environment reference wiring.
1031    /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
1032    pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
1033        // Register RESTORE service lazily if not already done.
1034        if !self.restore_registered.load(Ordering::SeqCst)
1035            && let Some(ref dispatcher) = self.tcp_dispatcher
1036        {
1037            let env_home = env.get_env_home().to_path_buf();
1038            let restore_server = NetworkRestoreServer::new(env_home.clone());
1039            dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
1040            self.restore_registered.store(true, Ordering::SeqCst);
1041            log::debug!(
1042                "Node '{}' RESTORE service registered via with_environment \
1043                 (env_home={})",
1044                self.config.node_name,
1045                env_home.display(),
1046            );
1047        }
1048
1049        // X-14: rebuild the VLSN index from recovery-replayed LN entries.
1050        // After a crash the on-disk vlsn.idx may be stale (either ahead of
1051        // the recovered B-tree, or behind if vlsn.idx was not flushed
1052        // after the last checkpoint).  Re-registering all (vlsn, lsn) pairs
1053        // from the redo pass gives a consistent in-memory index.
1054        if !env.recovery_vlsns.is_empty() {
1055            log::info!(
1056                "Node '{}': rebuilding VLSN index from {} recovered entries",
1057                self.config.node_name,
1058                env.recovery_vlsns.len(),
1059            );
1060            for &(vlsn, lsn_u64) in &env.recovery_vlsns {
1061                let lsn = noxu_util::Lsn::from_u64(lsn_u64);
1062                self.vlsn_index.register(
1063                    vlsn,
1064                    lsn.file_number(),
1065                    lsn.file_offset(),
1066                );
1067            }
1068        }
1069
1070        // X-1: truncate the VLSN index to the rollback matchpoint if recovery
1071        // detected a completed rollback period.  The matchpoint is the highest
1072        // LSN that is still valid after the rollback; entries with higher VLSNs
1073        // correspond to data that was rolled back and must not appear in the
1074        // index.
1075        if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
1076            // Find the latest VLSN whose LSN is at or before the matchpoint.
1077            // Scan the recovered VLSN pairs (sorted ascending) to find the
1078            // boundary.
1079            let safe_vlsn = env
1080                .recovery_vlsns
1081                .iter()
1082                .rev()
1083                .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
1084                .map(|&(vlsn, _)| vlsn)
1085                .unwrap_or(0);
1086            log::info!(
1087                "Node '{}': truncating VLSN index after vlsn={} \
1088                 (rollback matchpoint lsn={:#x})",
1089                self.config.node_name,
1090                safe_vlsn,
1091                matchpoint_lsn_u64,
1092            );
1093            self.vlsn_index.truncate_after(safe_vlsn);
1094        }
1095
1096        *self.env_impl.lock().unwrap() = Some(Arc::clone(&env));
1097
1098        // C-C2b: install the VLSN counter so log_txn_commit writes
1099        // VLSN-tagged headers.  When become_master then spawns an
1100        // EnvironmentLogScanner-backed FeederRunner, it will find these
1101        // entries and auto-feed them to replicas without any
1102        // replicate_entry call from the application.
1103        env.set_replication_vlsn_counter(Arc::clone(&self.wal_vlsn_counter));
1104    }
1105
1106    /// Get the current node state.
1107    ///
1108    ///
1109    ///
1110    /// Returns the current state of the node associated with this replication
1111    /// environment. If the caller's intent is to track the state of the node,
1112    /// `StateChangeListener` may be a more convenient and efficient approach.
1113    pub fn get_state(&self) -> NodeState {
1114        self.node_state.get_state()
1115    }
1116
1117    /// Check if this node is the master.
1118    ///
1119    /// Returns true if the node's current state is Master.
1120    pub fn is_master(&self) -> bool {
1121        self.node_state.get_state() == NodeState::Master
1122    }
1123
1124    /// Returns true if this node is an *authoritative* master (D4, JE
1125    /// `ElectionQuorum.isAuthoritativeMaster`): it is the group master AND it
1126    /// is still connected to enough replicas that, including itself, a
1127    /// SIMPLE_MAJORITY quorum is present.
1128    ///
1129    /// A master on the minority side of a network partition is NOT
1130    /// authoritative — it must not claim the special election ranking
1131    /// (`MASTER_RANKING`) nor (eventually) continue accepting writes, so the
1132    /// majority side can elect a fresh master without it competing
1133    /// (split-brain prevention).
1134    ///
1135    /// "Active replica count" = the number of currently-connected push-feeder
1136    /// runners serving *electable* peers (Monitors/Secondaries do not count
1137    /// toward the election quorum). `+ 1` for this master itself.
1138    pub fn is_authoritative_master(&self) -> bool {
1139        if !self.is_master() {
1140            return false;
1141        }
1142        let group = self.get_rep_group();
1143        // Total electable nodes (incl. self) — peers + this master.
1144        let electable_total: usize = group
1145            .get_nodes()
1146            .iter()
1147            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
1148            .count()
1149            + 1; // +1 for self/master (not registered as a peer)
1150
1151        // Active replicas = connected feeder runners whose peer is electable.
1152        let active_electable_replicas: usize = {
1153            let runners = self.active_feeder_runners.lock().unwrap();
1154            runners
1155                .keys()
1156                .filter(|name| {
1157                    group
1158                        .get_node(name)
1159                        .map(|n| {
1160                            n.node_type == crate::node_type::NodeType::Electable
1161                        })
1162                        .unwrap_or(false)
1163                })
1164                .count()
1165        };
1166        Self::authoritative_quorum_met(
1167            active_electable_replicas,
1168            electable_total,
1169        )
1170    }
1171
1172    /// Pure SIMPLE_MAJORITY quorum check for `is_authoritative_master` (JE
1173    /// `ElectionQuorum.isAuthoritativeMaster`): `(activeReplicas + 1) >=
1174    /// quorumSize` where `quorumSize = electableTotal / 2 + 1`.
1175    fn authoritative_quorum_met(
1176        active_electable_replicas: usize,
1177        electable_total: usize,
1178    ) -> bool {
1179        let quorum_size = electable_total / 2 + 1;
1180        (active_electable_replicas + 1) >= quorum_size
1181    }
1182
1183    /// Check if this node is a replica.
1184    ///
1185    /// Returns true if the node's current state is Replica.
1186    pub fn is_replica(&self) -> bool {
1187        self.node_state.get_state() == NodeState::Replica
1188    }
1189
1190    /// Returns true if the node is currently participating in the group
1191    /// as a Replica or a Master.
1192    pub fn is_active(&self) -> bool {
1193        self.node_state.get_state().is_active()
1194    }
1195
1196    /// Get the node name.
1197    ///
1198    ///
1199    ///
1200    /// Returns the unique name used to identify this replicated environment.
1201    pub fn get_node_name(&self) -> &str {
1202        self.config.node_name.as_str()
1203    }
1204
1205    /// Get the group name.
1206    ///
1207    /// Returns the name of the replication group this node belongs to.
1208    pub fn get_group_name(&self) -> &str {
1209        self.config.group_name.as_str()
1210    }
1211
1212    /// Get the current master (if known).
1213    ///
1214    /// Returns the name of the node that is currently the master, or None
1215    /// if the master is not known (e.g. the node is in the Unknown or
1216    /// Detached state).
1217    pub fn get_master_name(&self) -> Option<String> {
1218        self.master_tracker.get_master()
1219    }
1220
1221    /// Get the replication group info.
1222    ///
1223    ///
1224    ///
1225    /// Returns a description of the replication group as known by this node.
1226    /// The replicated group metadata is stored in a replicated database and
1227    /// updates are propagated by the current master node to all replicas. If
1228    /// this node is not the master, it is possible for its description of the
1229    /// group to be out of date.
1230    pub fn get_group(&self) -> &GroupService {
1231        &self.group_service
1232    }
1233
1234    /// Add a peer node to the replication group at runtime.
1235    ///
1236    /// The node is registered in the `GroupService` so elections and quorum
1237    /// calculations immediately reflect the new membership.
1238    pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
1239        use crate::group_service::NodeInfo;
1240        use std::time::Instant;
1241
1242        let info = NodeInfo {
1243            name: node.name.clone(),
1244            node_type: node.node_type,
1245            host: node.host.clone(),
1246            port: node.port,
1247            node_id: node.node_id,
1248            joined_at: Instant::now(),
1249            last_seen: Instant::now(),
1250            is_active: true,
1251            known_vlsn: 0,
1252            log_range: None,
1253            read_capacity_pct: node.read_capacity_pct,
1254            write_capacity_pct: node.write_capacity_pct,
1255            latency_hint_ms: node.latency_hint_ms,
1256        };
1257        self.group_service.add_node(info)?;
1258        log::info!(
1259            "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1260            self.config.node_name,
1261            node.name,
1262            node.host,
1263            node.port,
1264            self.config.group_name,
1265        );
1266
1267        // F9: if we are the current master, immediately register a
1268        // `Feeder` tracker for the new peer so AckTracker bookkeeping
1269        // and downstream pull-based streaming work without a forced
1270        // re-election.
1271        if self.is_master()
1272            && (node.node_type == crate::node_type::NodeType::Electable
1273                || node.node_type == crate::node_type::NodeType::Secondary)
1274        {
1275            let mut feeders = self.feeders.write();
1276            if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1277                feeders.push(Feeder::new(node.name.clone()));
1278                log::debug!(
1279                    "Node '{}' (master): dispatched Feeder for new peer '{}'",
1280                    self.config.node_name,
1281                    node.name,
1282                );
1283            }
1284        }
1285        Ok(())
1286    }
1287
1288    /// Remove a peer node from the replication group by name.
1289    ///
1290    /// The node is deregistered from the `GroupService`.  Elections initiated
1291    /// after this call will not include the removed node in quorum calculations.
1292    pub fn remove_peer(&self, name: &str) -> Result<()> {
1293        self.group_service.remove_node(name)?;
1294        log::info!(
1295            "Node '{}': removed peer '{}' from group '{}'",
1296            self.config.node_name,
1297            name,
1298            self.config.group_name,
1299        );
1300        Ok(())
1301    }
1302
1303    /// Update the capacity and latency metadata of an existing peer.
1304    ///
1305    /// Only the following fields are updated from `node`:
1306    ///   - `read_capacity_pct`
1307    ///   - `write_capacity_pct`
1308    ///   - `latency_hint_ms`
1309    ///
1310    /// The node's identity (name, address, port, node_type) is preserved.
1311    /// Safe to call while replication is active.
1312    ///
1313    /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1314    /// is rebuilt to reflect the new capacity/latency weights.
1315    ///
1316    /// # Note
1317    ///
1318    /// `update_peer_metadata` does not currently re-run
1319    /// `QuorumPolicy::validate(electable_count)` after the metadata
1320    /// change.  An LP-optimal `Expression` quorum that was safe before
1321    /// the update may no longer satisfy the intersection property
1322    /// afterwards.  Until automatic revalidation lands, deployments
1323    /// using `QuorumPolicy::Expression` should call
1324    /// `quorum_policy().validate(get_rep_group().electable_count())`
1325    /// on the returned `RepGroup` after every metadata change and
1326    /// fail the operator-facing operation if validation reports
1327    /// unsafety.
1328    pub fn update_peer_metadata(
1329        &self,
1330        name: &str,
1331        node: crate::rep_node::RepNode,
1332    ) -> Result<()> {
1333        self.group_service.update_node_metadata(
1334            name,
1335            node.read_capacity_pct,
1336            node.write_capacity_pct,
1337            node.latency_hint_ms,
1338        )?;
1339        log::info!(
1340            "Node '{}': updated metadata for peer '{}' \
1341             (read_cap={}, write_cap={}, latency={}ms)",
1342            self.config.node_name,
1343            name,
1344            node.read_capacity_pct,
1345            node.write_capacity_pct,
1346            node.latency_hint_ms,
1347        );
1348        Ok(())
1349    }
1350
1351    /// Returns a snapshot of the current replication group as a `RepGroup`.
1352    ///
1353    /// The snapshot reflects the state at the time of the call; subsequent
1354    /// `add_peer` / `remove_peer` calls are not reflected in it.
1355    pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1356        use crate::rep_group::RepGroup;
1357
1358        let mut group = RepGroup::new(
1359            self.config.group_name.clone(),
1360            self.group_service.get_group_id(),
1361        );
1362        for info in self.group_service.get_all_nodes() {
1363            let mut node = crate::rep_node::RepNode::new(
1364                info.name.clone(),
1365                info.node_type,
1366                info.host.clone(),
1367                info.port,
1368                info.node_id,
1369            );
1370            node.read_capacity_pct = info.read_capacity_pct;
1371            node.write_capacity_pct = info.write_capacity_pct;
1372            node.latency_hint_ms = info.latency_hint_ms;
1373            group.add_node(node);
1374        }
1375        group
1376    }
1377
1378    /// Get the replication configuration.
1379    ///
1380    ///
1381    ///
1382    /// Returns the replication configuration that has been used to create this
1383    /// environment.
1384    pub fn get_config(&self) -> &RepConfig {
1385        &self.config
1386    }
1387
1388    /// Get the current VLSN range on this node.
1389    ///
1390    /// Returns the range of VLSNs currently available on this node.
1391    pub fn get_vlsn_range(&self) -> VlsnRange {
1392        self.vlsn_index.get_range()
1393    }
1394
1395    /// Get the latest VLSN.
1396    ///
1397    /// Returns the most recent VLSN registered on this node.
1398    pub fn get_current_vlsn(&self) -> u64 {
1399        self.vlsn_index.get_latest_vlsn()
1400    }
1401
1402    /// The replica-side replication stream state (master high-water, applied
1403    /// VLSN, lag).  Used by the consistency read-gate to learn the master's
1404    /// latest known commit VLSN (JE `ConsistencyTracker.masterTxnEndVLSN`,
1405    /// updated by heartbeats).
1406    pub fn replica_stream(&self) -> &ReplicaStream {
1407        &self.replica_stream
1408    }
1409
1410    /// REP-10 (B): mint a [`CommitToken`] for the most recent commit on this
1411    /// master.
1412    ///
1413    /// Port of `MasterTxn.getCommitToken`: returns
1414    /// `new CommitToken(envUUID, commitVLSN.getSequence())`.  A client that
1415    /// just performed a write on the master calls this to obtain the token it
1416    /// will hand to a subsequent replica read
1417    /// (`Transaction.getCommitToken`).  Returns `None` on a non-master or when
1418    /// no commit VLSN exists yet (JE returns `null` when `commitVLSN.isNull`).
1419    ///
1420    /// The token's VLSN is the master's latest assigned VLSN — the same
1421    /// `wal_vlsn_counter` high-water the ack gate keys on (the commit was
1422    /// logged immediately before this call).
1423    pub fn commit_token(&self) -> Option<crate::CommitToken> {
1424        if !self.is_master() {
1425            return None;
1426        }
1427        let vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
1428        crate::CommitToken::new(self.config.group_name.clone(), vlsn)
1429    }
1430
1431    /// REP-10 (C): the read-gate. Enforce a replica read-consistency policy
1432    /// before a read transaction proceeds.
1433    ///
1434    /// Port of `ReplicaConsistencyPolicy.ensureConsistency` as invoked from a
1435    /// replica `beginTransaction` (`RepImpl.checkConsistency` /
1436    /// `Replica.getConsistencyTracker().awaitVLSN`).  Called by the replica
1437    /// env's transaction-begin / read path.
1438    ///
1439    /// - `policy_override`: a per-transaction policy (JE
1440    ///   `TransactionConfig.setConsistencyPolicy`).  When `None`, the node's
1441    ///   configured default is used (`ReplicationConfig.setConsistencyPolicy`
1442    ///   — [`RepConfig::consistency_policy`]).
1443    ///
1444    /// On a master, or when the effective policy is
1445    /// [`ConsistencyPolicy::NoConsistency`], this returns immediately so
1446    /// existing behaviour is unchanged unless a policy is set.  On a replica
1447    /// with a non-`NoConsistency` policy it BLOCKS until the replica has
1448    /// replayed far enough or the policy timeout expires (a clean
1449    /// [`RepError`], never a hang).
1450    pub fn begin_read_consistency(
1451        &self,
1452        policy_override: Option<&crate::ConsistencyPolicy>,
1453    ) -> Result<()> {
1454        // Resolve the effective policy: per-txn override else node default.
1455        let default_policy = self.config.consistency_policy.clone();
1456        let policy = policy_override.unwrap_or(&default_policy);
1457
1458        // NoConsistency never blocks (the master path also lands here).
1459        if matches!(policy, crate::ConsistencyPolicy::NoConsistency) {
1460            return Ok(());
1461        }
1462
1463        // A non-No policy only makes sense on a replica with a live replay
1464        // (its last_applied_vlsn is the wait predicate).  Without a tracker
1465        // there is nothing to wait on — treat as immediately consistent
1466        // rather than block forever (e.g. on the master, which is by
1467        // definition fully current).
1468        let tracker = self.consistency_tracker.lock().unwrap().clone();
1469        let Some(tracker) = tracker else {
1470            return Ok(());
1471        };
1472
1473        // Surface the master's latest known VLSN for the time policy
1474        // (heartbeat / feeder high-water).  JE ConsistencyTracker tracks this
1475        // via trackHeartbeat; here we read the replica_stream high-water.
1476        let master_vlsn = self.replica_stream.get_master_vlsn();
1477        if master_vlsn > 0 {
1478            tracker.set_master_vlsn(master_vlsn);
1479        }
1480
1481        tracker.await_consistency(policy)
1482    }
1483
1484    /// REP-10 (C) test seam: install a [`ConsistencyTracker`] over an existing
1485    /// `last_applied_vlsn` handle, exactly as `become_replica` does when it
1486    /// starts the live replay thread.
1487    ///
1488    /// Lets a test drive a real [`noxu_dbi::ReplicaReplay`] and exercise
1489    /// [`Self::begin_read_consistency`] end-to-end without standing up TCP
1490    /// feeder/receiver threads.  Not part of the production API.
1491    #[cfg(any(test, feature = "test-harness"))]
1492    pub fn install_consistency_tracker_for_test(
1493        &self,
1494        last_applied_vlsn: std::sync::Arc<std::sync::atomic::AtomicU64>,
1495    ) -> crate::ConsistencyTracker {
1496        let tracker = crate::ConsistencyTracker::new(last_applied_vlsn);
1497        *self.consistency_tracker.lock().unwrap() = Some(tracker.clone());
1498        tracker
1499    }
1500
1501    /// REP-1 STEP 5 (D): run a live syncup against `feeder` and, if this
1502    /// replica's tail diverged, ROLL IT BACK to the common matchpoint instead
1503    /// of falling back to a network restore.
1504    ///
1505    /// Port of the replica's side of JE `ReplicaFeederSyncup.execute`:
1506    /// `findMatchpoint` → `verifyRollback` → `replay.rollback` →
1507    /// `vlsnIndex.truncateFromTail` → resume streaming from `matchpoint + 1`.
1508    ///
1509    /// `feeder` is the master's [`crate::stream::syncup::SyncupView`] (built
1510    /// from its VLSN index, or exchanged over the syncup wire protocol in
1511    /// [`crate::stream::syncup_protocol`]). The decision uses the same pure
1512    /// core the protocol drives: `find_matchpoint` + `verify_rollback`.
1513    ///
1514    /// Returns:
1515    /// - [`SyncupAction::RolledBack`] — the divergent tail was truncated to
1516    ///   the matchpoint; resume streaming from `start_vlsn`. The non-diverged
1517    ///   case (matchpoint == last VLSN) returns `RolledBack` with an empty
1518    ///   tail and is a no-op rollback.
1519    /// - [`SyncupAction::NeedsRestore`] — `verify_rollback` selected
1520    ///   NetworkRestore (no common matchpoint) or HardRecovery (the rollback
1521    ///   would cross a committed/aborted txn); the caller must network-restore
1522    ///   per JE.
1523    ///
1524    /// The non-diverged fast path (the replica's range is a prefix of the
1525    /// feeder's) is still served by the range-check `negotiate_syncup`
1526    /// (`SyncupResult::CanServe`) in the streaming path; this method is the
1527    /// DIVERGED case.
1528    pub fn syncup_with_feeder(
1529        &self,
1530        feeder: &dyn crate::stream::syncup::SyncupView,
1531    ) -> Result<SyncupAction> {
1532        // Build the replica's SyncupView. When a real LogManager is wired,
1533        // re-read the log (SyncupLogView) so the per-VLSN fingerprint is the
1534        // actual record checksum (JE ReplicaSyncupReader). Otherwise (the
1535        // VLSN-index-only harness model) fall back to the index view, whose
1536        // fingerprint is the LSN.
1537        let log_view: Option<crate::stream::syncup_reader::SyncupLogView> =
1538            self.env_impl.lock().unwrap().clone().and_then(|env| {
1539                if let Some(lm) = env.get_log_manager() {
1540                    // Flush so all VLSN-tagged entries are on disk before the
1541                    // backward re-read (JE flushNoSync in initScan).
1542                    let _ = lm.flush_sync();
1543                }
1544                crate::stream::syncup_reader::SyncupLogView::scan(
1545                    env.get_env_home(),
1546                )
1547            });
1548        let index_view = VlsnIndexView::from_index(&self.vlsn_index);
1549        let replica_view: &dyn crate::stream::syncup::SyncupView =
1550            match &log_view {
1551                Some(v) => v,
1552                None => &index_view,
1553            };
1554
1555        let range = self.vlsn_index.get_range();
1556        let last_sync = range.get_last_sync();
1557        let last_txn_end = range.get_last_txn_end();
1558        let to_vlsn = |v: u64| {
1559            if v == 0 {
1560                noxu_util::NULL_VLSN
1561            } else {
1562                noxu_util::Vlsn::new(v as i64)
1563            }
1564        };
1565
1566        // Step 1: find the matchpoint (JE findMatchpoint).
1567        let matchpoint = find_matchpoint(replica_view, feeder);
1568
1569        // numPassedCommits: count of txn ends strictly above the matchpoint.
1570        // When we re-read the log, count them exactly; otherwise rely on the
1571        // numeric `lastTxnEnd <= matchpoint` test in verify_rollback (which
1572        // matches JE when sync points == txn ends).
1573        let num_passed_commits = match (&log_view, &matchpoint) {
1574            (Some(v), Matchpoint::Found { vlsn, .. }) => {
1575                v.num_passed_commits(*vlsn)
1576            }
1577            _ => 0,
1578        };
1579        let decision = verify_rollback(
1580            &matchpoint,
1581            to_vlsn(last_txn_end),
1582            to_vlsn(last_sync),
1583            num_passed_commits,
1584        );
1585
1586        match decision {
1587            RollbackDecision::RollbackToMatchpoint {
1588                matchpoint_vlsn,
1589                start_vlsn,
1590            } => {
1591                let matchpoint_lsn = match &matchpoint {
1592                    Matchpoint::Found { lsn, .. } => *lsn,
1593                    Matchpoint::None => 0,
1594                };
1595                // Collect the rolled-back LSNs (VLSNs strictly above the
1596                // matchpoint). When the real log was re-read, use its EXACT
1597                // per-VLSN LSNs so make-invisible flips the right header bytes
1598                // (the sparse VLSN index only stores boundary/last LSNs).
1599                let mp = matchpoint_vlsn.sequence().max(0) as u64;
1600                let rollback_lsns: Vec<noxu_util::Lsn> = match &log_view {
1601                    Some(v) => v
1602                        .entries()
1603                        .filter(|(vlsn, _)| (vlsn.sequence() as u64) > mp)
1604                        .map(|(_, e)| noxu_util::Lsn::from_u64(e.lsn))
1605                        .collect(),
1606                    None => self
1607                        .vlsn_index
1608                        .snapshot_entries()
1609                        .into_iter()
1610                        .filter(|(vlsn, _, _)| *vlsn > mp)
1611                        .map(|(_, file, offset)| {
1612                            noxu_util::Lsn::new(file, offset)
1613                        })
1614                        .collect(),
1615                };
1616                self.execute_rollback(mp, matchpoint_lsn, &rollback_lsns)?;
1617                Ok(SyncupAction::RolledBack {
1618                    matchpoint_vlsn: mp,
1619                    start_vlsn: start_vlsn.sequence().max(0) as u64,
1620                })
1621            }
1622            RollbackDecision::HardRecovery { .. }
1623            | RollbackDecision::NetworkRestore => {
1624                Ok(SyncupAction::NeedsRestore)
1625            }
1626        }
1627    }
1628
1629    /// Execute the durable + in-memory rollback to `matchpoint_vlsn`
1630    /// (LSN `matchpoint_lsn`). Port of JE `Replay.rollback` +
1631    /// `vlsnIndex.truncateFromTail`.
1632    ///
1633    /// Durable steps (RollbackStart/End + make-invisible + fsync) go through
1634    /// [`noxu_recovery::rollback`] when a `LogManager` is wired; the VLSN index
1635    /// is always truncated to the matchpoint so the reported range matches the
1636    /// rolled-back state and streaming resumes from `matchpoint + 1`.
1637    fn execute_rollback(
1638        &self,
1639        matchpoint_vlsn: u64,
1640        matchpoint_lsn: u64,
1641        rollback_lsns: &[noxu_util::Lsn],
1642    ) -> Result<()> {
1643        // Durable rollback (RollbackStart … make-invisible … RollbackEnd) when
1644        // a live LogManager is available. The harness-level env (VLSN-index
1645        // only, no LogManager) skips the on-disk steps; the index truncation
1646        // below is what makes the replica converge in that model.
1647        if let Some(env) = self.env_impl.lock().unwrap().clone()
1648            && let Some(log_mgr) = env.get_log_manager()
1649            && matchpoint_lsn != 0
1650        {
1651            let mp_lsn = noxu_util::Lsn::from_u64(matchpoint_lsn);
1652            // active_txn_ids: the harness/VLSN-index model has no live txn
1653            // table here; the durable RollbackStart records an empty set, and
1654            // the per-txn gating (REP-1 STEP 2) applies during recovery when
1655            // the analysis pass rebuilds the active set. A future pass can
1656            // thread the live ReplayTxn ids through (JE
1657            // localActiveTxns.keySet()).
1658            noxu_recovery::rollback(
1659                &log_mgr,
1660                noxu_util::Vlsn::new(matchpoint_vlsn as i64),
1661                mp_lsn,
1662                Vec::new(),
1663                rollback_lsns,
1664            )
1665            .map_err(|e| {
1666                RepError::DatabaseError(format!(
1667                    "live rollback to matchpoint failed: {e}"
1668                ))
1669            })?;
1670        }
1671
1672        // JE vlsnIndex.truncateFromTail(startVLSN, matchpointLSN): drop the
1673        // divergent VLSN tail so the reported range matches the recovered
1674        // state and streaming resumes from matchpoint + 1.
1675        self.vlsn_index.truncate_after(matchpoint_vlsn);
1676
1677        log::info!(
1678            "Node '{}': live syncup rolled back to matchpoint vlsn={} \
1679             (lsn={:#x}); {} tail entries truncated",
1680            self.config.node_name,
1681            matchpoint_vlsn,
1682            matchpoint_lsn,
1683            rollback_lsns.len(),
1684        );
1685        Ok(())
1686    }
1687
1688    /// Test-only: clone the env's SHARED VLSN index `Arc`.
1689    ///
1690    /// REP-6: the replica receive loop (`become_replica` ->
1691    /// `EnvironmentLogWriter`) must feed THIS index — the one
1692    /// `get_vlsn_range`, `flush_to_disk`, and election ranking read — not a
1693    /// throwaway. Tests use this to build a writer the same way
1694    /// `become_replica` does and assert the shared index advances.
1695    #[cfg(feature = "test-harness")]
1696    pub fn vlsn_index_arc(&self) -> Arc<crate::vlsn::vlsn_index::VlsnIndex> {
1697        Arc::clone(&self.vlsn_index)
1698    }
1699
1700    /// Return the list of replica names that currently have a `Feeder`
1701    /// tracker on this (master) node.
1702    ///
1703    /// Used by tests and operator tooling.  The returned list reflects
1704    /// the master's view at the time of the call; subsequent
1705    /// `add_peer`/`remove_peer` calls may change it.
1706    pub fn feeder_replica_names(&self) -> Vec<String> {
1707        self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1708    }
1709
1710    /// Number of downstream connections this node has served via the JE
1711    /// `Feeder`/`MasterFeederSource` mechanism (`FeederRunner +
1712    /// EnvironmentLogScanner` reading this node's OWN WAL).
1713    ///
1714    /// A non-zero value PROVES this node fed a downstream replica by the
1715    /// SAME mechanism the master uses — a cascading replica and the master
1716    /// run the identical `PeerFeederService` → `FeederRunner` →
1717    /// `EnvironmentLogScanner` path (JE `FeederManager` → `Feeder` →
1718    /// `MasterFeederSource`).  Used by the chained-replication test to assert
1719    /// the cascade does NOT use the in-memory pull fallback.
1720    pub fn wal_feeds_served(&self) -> u64 {
1721        self.wal_feeds_served.load(std::sync::atomic::Ordering::SeqCst)
1722    }
1723
1724    // -----------------------------------------------------------------------
1725    // C-C2 — active push feeder API
1726    // -----------------------------------------------------------------------
1727
1728    /// Register a channel for pushing log entries to a specific replica.
1729    ///
1730    /// When [`Self::become_master`] is called — or if the node is **already
1731    /// master** — a [`FeederRunner`] background thread is immediately spawned
1732    /// for this channel.  The thread reads from a dedicated in-memory queue
1733    /// that is fed by [`Self::replicate_entry`] / [`Self::apply_entry`], and
1734    /// sends framed log entries to the replica over `channel`.  Acks sent
1735    /// back by the replica are visible via
1736    /// [`Self::active_feeder_runner_acked_vlsn`].
1737    ///
1738    /// # Production vs. test use
1739    ///
1740    /// *Production*: pass a [`crate::net::TcpChannel`] connected to the
1741    /// replica's inbound feeder service.  
1742    /// *Tests*: pass one half of a [`crate::net::LocalChannelPair`].
1743    ///
1744    /// # Note on push vs. pull
1745    ///
1746    /// Registering a channel activates the **push** path: the master
1747    /// initiates and owns the feeder connection.  The existing **pull** path
1748    /// (`PeerFeederService` / `catch_up_from_peer`) continues to operate in
1749    /// parallel for replicas that connect proactively.  Do not register a
1750    /// channel for a replica that already connects via the pull path, or
1751    /// entries may be delivered twice.
1752    ///
1753    /// If `become_master` was called *before* registering the channel, call
1754    /// this method afterward; it will spawn the FeederRunner immediately.
1755    pub fn register_feeder_channel(
1756        &self,
1757        replica_name: String,
1758        channel: Arc<dyn crate::net::Channel>,
1759    ) {
1760        {
1761            let mut ch = self.feeder_channels.lock().unwrap();
1762            ch.insert(replica_name.clone(), Arc::clone(&channel));
1763        }
1764        if self.is_master() {
1765            self.spawn_feeder_runner(replica_name, channel);
1766        }
1767    }
1768
1769    /// Return the last VLSN acknowledged by the FeederRunner for `replica_name`.
1770    ///
1771    /// Returns `0` if no FeederRunner is currently active for that replica
1772    /// (either `become_master` was not called yet, or no channel was
1773    /// registered).  Use this to poll catch-up progress before shutdown.
1774    pub fn active_feeder_runner_acked_vlsn(&self, replica_name: &str) -> u64 {
1775        self.active_feeder_runners
1776            .lock()
1777            .unwrap()
1778            .get(replica_name)
1779            .map(|r| r.known_replica_vlsn())
1780            .unwrap_or(0)
1781    }
1782
1783    /// Spawn a FeederRunner thread for `replica_name` using `channel`.
1784    ///
1785    /// Creates a dedicated `PeerLogScanner` queue for the replica, registers
1786    /// it in `feeder_queues` so that future `replicate_entry` / `apply_entry`
1787    /// calls fan out into it, spawns the `FeederRunner::run` loop, and
1788    /// records the `Arc<FeederRunner>` in `active_feeder_runners`.
1789    ///
1790    /// Idempotent: if a FeederRunner is already active for `replica_name`
1791    /// (from a prior `become_master` call), it is replaced — the old channel
1792    /// should have been closed already via `close()`.
1793    ///
1794    /// **WAL-scanner auto-feed path (C-C2b)**: when a live `EnvironmentImpl`
1795    /// has been wired via `with_environment`, the FeederRunner thread uses an
1796    /// `EnvironmentLogScanner` as its source.  Every `log_txn_commit` on the
1797    /// master writes a VLSN-tagged WAL entry (22-byte header); the scanner
1798    /// finds these entries and streams them to the replica automatically,
1799    /// without any `replicate_entry` call from the application.
1800    ///
1801    /// **Fallback path**: when no `EnvironmentImpl` is wired the runner reads
1802    /// from the in-memory `PeerLogScanner` queue populated by
1803    /// `replicate_entry` / `apply_entry` — the previous manual behaviour.
1804    fn spawn_feeder_runner(
1805        &self,
1806        replica_name: String,
1807        channel: Arc<dyn crate::net::Channel>,
1808    ) {
1809        // Dedicated entry queue: entries flowing from this master reach the
1810        // FeederRunner without competing with PeerFeederService.
1811        let queue = Arc::new(PeerLogScanner::new());
1812        {
1813            self.feeder_queues
1814                .write()
1815                .unwrap()
1816                .insert(replica_name.clone(), Arc::clone(&queue));
1817        }
1818
1819        // REP-9 Part 1: wire an ack sink so the FeederRunner forwards every
1820        // inbound replica ack to `env.record_ack(vlsn, replica_name)`, which
1821        // reaches BOTH the AckTracker (commit-blocking quorum) and the
1822        // matching `Feeder::acked_vlsn` (DTVLSN ranking).  Without this the
1823        // ack reached only the runner's private `known_replica_vlsn`.  The
1824        // sink holds a `Weak<Self>` so it never extends the env's lifetime;
1825        // if `self_weak` was never initialised we fall back to the plain
1826        // (sink-less) runner — `record_ack` is still reachable from tests.
1827        let runner = match self.self_weak.get().and_then(Weak::upgrade) {
1828            Some(env_arc) => {
1829                let weak = Arc::downgrade(&env_arc);
1830                let sink: crate::stream::feeder::AckSink =
1831                    Arc::new(move |name: &str, vlsn: u64| {
1832                        if let Some(env) = weak.upgrade() {
1833                            env.record_ack(vlsn, name);
1834                        }
1835                    });
1836                Arc::new(FeederRunner::new_with_ack_sink(
1837                    Arc::clone(&channel),
1838                    1,
1839                    replica_name.clone(),
1840                    sink,
1841                ))
1842            }
1843            None => Arc::new(FeederRunner::new(Arc::clone(&channel), 1)),
1844        };
1845        let runner_clone = Arc::clone(&runner);
1846        let replica_clone = replica_name.clone();
1847
1848        // C-C2b: prefer EnvironmentLogScanner (WAL auto-feed) when env is
1849        // wired; fall back to in-memory queue (manual replicate_entry path)
1850        // otherwise.
1851        let env_opt = self.env_impl.lock().unwrap().clone();
1852
1853        let handle = std::thread::Builder::new()
1854            .name(format!("noxu-feeder-{}", replica_name))
1855            .spawn(move || {
1856                if let Some(env) = env_opt {
1857                    if let Some(mut scanner) =
1858                        EnvironmentLogScanner::new(&env, None)
1859                    {
1860                        log::info!(
1861                            "FeederRunner for replica '{}': using \
1862                             EnvironmentLogScanner (WAL auto-feed)",
1863                            replica_clone,
1864                        );
1865                        let _ = runner_clone.run(&mut scanner);
1866                    } else {
1867                        log::warn!(
1868                            "FeederRunner for replica '{}': \
1869                             EnvironmentLogScanner unavailable, \
1870                             falling back to in-memory queue",
1871                            replica_clone,
1872                        );
1873                        let mut source = PeerScannerAdapter::new(queue, 0);
1874                        let _ = runner_clone.run(&mut source);
1875                    }
1876                } else {
1877                    let mut source = PeerScannerAdapter::new(queue, 0);
1878                    let _ = runner_clone.run(&mut source);
1879                }
1880                log::debug!(
1881                    "FeederRunner for replica '{}' exited cleanly",
1882                    replica_clone
1883                );
1884            })
1885            .expect("failed to spawn FeederRunner thread");
1886
1887        {
1888            let mut runners = self.active_feeder_runners.lock().unwrap();
1889            runners.insert(replica_name.clone(), Arc::clone(&runner));
1890        }
1891        self.io_threads.lock().unwrap().push(handle);
1892
1893        log::info!(
1894            "Node '{}' (master): FeederRunner thread spawned for replica '{}'",
1895            self.config.node_name.as_str(),
1896            replica_name,
1897        );
1898    }
1899
1900    // -----------------------------------------------------------------------
1901
1902    /// Bootstrap this node's environment by network-restoring all `.ndb`
1903    /// files from `peer_name` via the dispatcher's RESTORE service.
1904    ///
1905    /// Closes findings F2 / F4 of the 2026 review.
1906    ///
1907    /// The standalone `NetworkRestore::execute()` opens raw TCP and
1908    /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1909    /// Production replicated environments host the RESTORE handler on the
1910    /// dispatcher, so this method routes through `execute_via_dispatcher`.
1911    ///
1912    /// `peer_name` must be a known peer in `GroupService`; on success the
1913    /// peer's `.ndb` files are written into `config.env_home`.  Returns
1914    /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1915    /// fails for any reason.
1916    pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1917        let env_home = self.config.env_home.clone().ok_or_else(|| {
1918            RepError::ConfigError(
1919                "bootstrap_via_dispatcher requires env_home in RepConfig"
1920                    .into(),
1921            )
1922        })?;
1923        let peer_info = self
1924            .group_service
1925            .get_all_nodes()
1926            .into_iter()
1927            .find(|n| n.name == peer_name)
1928            .ok_or_else(|| {
1929                RepError::ConfigError(format!(
1930                    "peer '{}' not registered in group '{}'",
1931                    peer_name, self.config.group_name,
1932                ))
1933            })?;
1934
1935        let cfg = NetworkRestoreConfig {
1936            source_node: peer_info.name.clone(),
1937            source_host: peer_info.host.clone(),
1938            source_port: peer_info.port,
1939            retain_log_files: true,
1940        };
1941        let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1942        restore.execute_via_dispatcher()?;
1943        log::info!(
1944            "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1945            self.config.node_name,
1946            peer_info.name,
1947            peer_info.host,
1948            peer_info.port,
1949        );
1950        Ok(())
1951    }
1952
1953    /// Get replication statistics.
1954    ///
1955    ///
1956    ///
1957    /// Returns statistics associated with this environment.
1958    pub fn get_stats(&self) -> &RepStats {
1959        &self.stats
1960    }
1961
1962    /// Get the ack tracker.
1963    pub fn get_ack_tracker(&self) -> &AckTracker {
1964        &self.ack_tracker
1965    }
1966
1967    /// Ensure the node state machine is in Unknown state, transitioning
1968    /// from Detached if necessary. This is needed because the state machine
1969    /// only allows Detached -> Unknown -> Master/Replica.
1970    pub fn ensure_unknown_state(&self) -> Result<()> {
1971        let current = self.node_state.get_state();
1972        match current {
1973            NodeState::Unknown => Ok(()),
1974            NodeState::Detached => {
1975                self.node_state.transition_to(NodeState::Unknown)?;
1976                Ok(())
1977            }
1978            // Master and Replica must transition through Unknown before
1979            // joining a new group or reconnecting.
1980            NodeState::Master | NodeState::Replica => {
1981                self.node_state.transition_to(NodeState::Unknown)?;
1982                Ok(())
1983            }
1984            NodeState::Shutdown => {
1985                Err(RepError::StateError("Node is shut down".to_string()))
1986            }
1987        }
1988    }
1989
1990    /// Transition to master state.
1991    ///
1992    /// Transitions this node to Master state for the given election term.
1993    /// As master, the node can accept write operations and feed log entries
1994    /// to replicas.
1995    ///
1996    /// **Active push-feeder** (C-C2): if feeder channels have been registered
1997    /// via [`Self::register_feeder_channel`] before this call, a
1998    /// [`FeederRunner`] background thread is spawned per channel.
1999    ///
2000    /// **WAL-scanner auto-feed path (C-C2b, v3.3.0)**: when
2001    /// [`Self::with_environment`] has been called before `become_master`,
2002    /// each `FeederRunner` thread uses an [`EnvironmentLogScanner`] as its
2003    /// source.  Every `log_txn_commit` on the master writes a VLSN-tagged
2004    /// 22-byte WAL entry (via `LogManager::log_with_vlsn`); the scanner
2005    /// discovers these entries and streams them to replicas automatically,
2006    /// without any [`Self::replicate_entry`] call from the application.
2007    ///
2008    /// **Fallback path**: when no `EnvironmentImpl` is wired, the runner
2009    /// reads from the in-memory queue populated by [`Self::replicate_entry`] /
2010    /// [`Self::apply_entry`].
2011    ///
2012    /// If no feeder channels are registered, this call registers per-replica
2013    /// `Feeder` tracker structs for `AckTracker` bookkeeping only.  In that
2014    /// case replicas must connect proactively to the `PEER_FEEDER` pull
2015    /// service to receive entries.
2016    pub fn become_master(&self, term: u64) -> Result<()> {
2017        if self.is_shutdown() {
2018            return Err(RepError::StateError(
2019                "Cannot become master: environment is closed".to_string(),
2020            ));
2021        }
2022
2023        // JE invariant: only `Electable` nodes can become master.  `Secondary`,
2024        // `Monitor`, and `Arbiter` are not electable and must be rejected at
2025        // the API layer (mirrors JE `ExceptionTest`).  See
2026        // `NodeType::can_be_master`.
2027        if !self.config.node_type.can_be_master() {
2028            return Err(RepError::InvalidStateTransition(format!(
2029                "node '{}' has type {} which is not electable as master",
2030                self.config.node_name.as_str(),
2031                self.config.node_type,
2032            )));
2033        }
2034
2035        // Ensure we can reach Master state (may need Detached -> Unknown first)
2036        self.ensure_unknown_state()?;
2037
2038        let old_state = self.node_state.get_state();
2039        self.node_state.transition_to(NodeState::Master)?;
2040        self.master_tracker.set_master(self.config.node_name.as_str(), term);
2041
2042        // --- F9: spawn Feeder trackers for each known replica -------------
2043        //
2044        // Closes finding F9 of the 2026 review.
2045        // The architecture is pull-based: replicas pull from the master's
2046        // `PEER_FEEDER` service via `catch_up_from_peer`.  However, the
2047        // master must:
2048        //   1. Track each replica via a `Feeder` so AckTracker bookkeeping
2049        //      can attribute replica acks to the right node.
2050        //   2. Push its own writes into `peer_scanner` so replicas pulling
2051        //      from `PEER_FEEDER` actually receive entries (`replicate_entry`).
2052        //
2053        // Here we ensure step 1: every known electable peer in the group
2054        // gets a `Feeder` entry.
2055        {
2056            let mut feeders = self.feeders.write();
2057            // Drop any stale feeders left over from a prior role.  A
2058            // `Feeder` is just an in-memory tracker; recreating it is
2059            // cheap and avoids state inversion bugs across role changes.
2060            feeders.clear();
2061            for peer in self.group_service.get_all_nodes() {
2062                if peer.name == self.config.node_name {
2063                    continue;
2064                }
2065                if peer.node_type != crate::node_type::NodeType::Electable
2066                    && peer.node_type != crate::node_type::NodeType::Secondary
2067                {
2068                    // Arbiters do not receive log entries.
2069                    continue;
2070                }
2071                feeders.push(Feeder::new(peer.name.clone()));
2072                log::debug!(
2073                    "Node '{}' (master, term={}): registered Feeder for \
2074                     replica '{}'",
2075                    self.config.node_name.as_str(),
2076                    term,
2077                    peer.name,
2078                );
2079            }
2080        }
2081
2082        // For observability, log the count.
2083        log::info!(
2084            "Node '{}' became master for term {} \
2085             (feeder trackers: {} known replicas)",
2086            self.config.node_name.as_str(),
2087            term,
2088            self.feeders.read().len(),
2089        );
2090
2091        // C-C2: spawn FeederRunner threads for pre-registered channels.
2092        //
2093        // When `register_feeder_channel` was called before `become_master`,
2094        // the channels are already in `feeder_channels`. Drain them and
2095        // spawn a FeederRunner per replica.  The FeederRunner reads from a
2096        // dedicated `PeerLogScanner` queue (populated by `replicate_entry`
2097        // fan-out) and pushes framed log entries to the replica over the
2098        // registered channel.  Acks from the replica are tracked in the
2099        // FeederRunner and visible via `active_feeder_runner_acked_vlsn`.
2100        {
2101            let channels: Vec<(String, Arc<dyn crate::net::Channel>)> = self
2102                .feeder_channels
2103                .lock()
2104                .unwrap()
2105                .iter()
2106                .map(|(k, v)| (k.clone(), Arc::clone(v)))
2107                .collect();
2108            for (replica_name, channel) in channels {
2109                self.spawn_feeder_runner(replica_name, channel);
2110            }
2111        }
2112
2113        // --- WAL-backed PEER_FEEDER for pull-path replicas -------------------
2114        //
2115        // The master's writes go to its WAL (VLSN-tagged 22-byte headers) and
2116        // its VLSN index, but NOT necessarily to the in-memory `peer_scanner`
2117        // (e.g. `register_vlsn_typed` only updates the index).  A replica that
2118        // pulls via the `PEER_FEEDER` service therefore finds an empty
2119        // in-memory scanner and gets `NeedsRestore`.
2120        //
2121        // Re-register PEER_FEEDER with a WAL-backed source so a pulling
2122        // replica receives the VLSN-tagged stream straight from the master's
2123        // OWN WAL via the same `EnvironmentLogScanner` + `FeederRunner` used
2124        // throughout.  Faithful to JE `MasterFeederSource(repImpl, vlsnIndex,
2125        // startVLSN)`, which reads the VLSNIndex + log regardless of node
2126        // role; `FeederManager` runs feeders on whatever node holds the data.
2127        // (The same registration runs, gated on `cascade_feeding`, in
2128        // `become_replica` so a mid-tier replica can cascade downstream.)
2129        if let Some(env) = self.env_impl.lock().unwrap().clone()
2130            && let Some(ref dispatcher) = self.tcp_dispatcher
2131        {
2132            let wal_source = crate::stream::peer_feeder::WalFeederSource::new(
2133                Arc::clone(&env),
2134                Arc::clone(&self.vlsn_index),
2135            );
2136            let svc = PeerFeederService::with_wal_source_counted(
2137                Arc::clone(&self.peer_scanner),
2138                wal_source,
2139                Arc::clone(&self.wal_feeds_served),
2140            );
2141            dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(svc));
2142            log::debug!(
2143                "Node '{}' (master): PEER_FEEDER now serves replicas from \
2144                 its own WAL",
2145                self.config.node_name.as_str(),
2146            );
2147        }
2148
2149        // -------------------------------------------------------------------
2150
2151        // Notify listeners
2152        self.notify_listeners(old_state, NodeState::Master);
2153
2154        Ok(())
2155    }
2156
2157    /// Transition to replica state with the given master.
2158    ///
2159    /// Transitions this node to Replica state. The node will receive log
2160    /// entries from the specified master.
2161    ///
2162    /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
2163    /// the method prepares an `EnvironmentLogWriter` so that replicated
2164    /// entries can be written to the local log.  The actual network connection
2165    /// is established by the `TcpServiceDispatcher`; this method logs intent.
2166    ///
2167    /// In HA.
2168    pub fn become_replica(&self, master_name: &str) -> Result<()> {
2169        if self.is_shutdown() {
2170            return Err(RepError::StateError(
2171                "Cannot become replica: environment is closed".to_string(),
2172            ));
2173        }
2174
2175        // Ensure we can reach Replica state (may need Detached -> Unknown first)
2176        self.ensure_unknown_state()?;
2177
2178        let old_state = self.node_state.get_state();
2179        self.node_state.transition_to(NodeState::Replica)?;
2180        self.master_tracker.set_master(master_name, 0);
2181        self.replica_stream.set_master(master_name);
2182        self.replica_stream.set_state(
2183            crate::stream::replica_stream::ReplicaStreamState::Connecting,
2184        );
2185
2186        // --- G19: start replica receive loop --------------------------------
2187        //
2188        // Connects to the master's PEER_FEEDER service and runs a
2189        // ReplicaReceiver loop in a background thread.  The receiver writes
2190        // replicated entries via EnvironmentLogWriter.
2191        if let Some(env) = self.env_impl.lock().unwrap().clone() {
2192            if let Some(log_mgr) = env.get_log_manager() {
2193                // REP-6: feed the env's SHARED, persisted VLSN index (the one
2194                // flush_to_disk persists and get_vlsn_range / election ranking
2195                // read) into the replica receive loop — NOT a throwaway. Using
2196                // a fresh index would leave the persisted vlsn.idx, the
2197                // reported VLSN range, and the DTVLSN-ranking own_vlsn lagging
2198                // the actually-received stream, widening catch-up (or forcing
2199                // an unnecessary network restore) after a clean restart.
2200                // JE: the replica's VLSNIndex IS the environment's persisted
2201                // index (see VLSNIndex).
2202                let vlsn_index = Arc::clone(&self.vlsn_index);
2203
2204                // --- Chained replication: start a WAL-backed feeder source ---
2205                //
2206                // When `cascade_feeding` is enabled, re-register this node's
2207                // PEER_FEEDER service with a WAL-backed source so a DOWNSTREAM
2208                // replica can connect and receive the VLSN-tagged log stream
2209                // FROM THIS REPLICA's OWN WAL (the bytes it received + persisted
2210                // via EnvironmentLogWriter::log_with_vlsn).  The feeder uses the
2211                // same EnvironmentLogScanner + FeederRunner the master uses.
2212                //
2213                // Faithful to JE's cascading-feeder model: the same
2214                // FeederManager/Feeder/FeederSource machinery runs on any node
2215                // that holds the data.  `FeederSource` is documented as "a real
2216                // Master OR a Replica in a Replica chain that is replaying log
2217                // records it received from some other source"
2218                // (`FeederSource.java`); `MasterFeederSource(repImpl, vlsnIndex,
2219                // startVLSN)` reads the VLSNIndex + log regardless of role.
2220                //
2221                // Default OFF (master-direct) preserves current behaviour: a
2222                // replica's PEER_FEEDER stays backed by the in-memory pull
2223                // scanner unless cascade is explicitly enabled.
2224                if self.config.cascade_feeding {
2225                    if let Some(ref dispatcher) = self.tcp_dispatcher {
2226                        let wal_source =
2227                            crate::stream::peer_feeder::WalFeederSource::new(
2228                                Arc::clone(&env),
2229                                Arc::clone(&self.vlsn_index),
2230                            );
2231                        let svc = PeerFeederService::with_wal_source_counted(
2232                            Arc::clone(&self.peer_scanner),
2233                            wal_source,
2234                            Arc::clone(&self.wal_feeds_served),
2235                        );
2236                        dispatcher
2237                            .register(PEER_FEEDER_SERVICE_NAME, Arc::new(svc));
2238                        log::info!(
2239                            "Node '{}' (replica): cascade feeding ENABLED — \
2240                             PEER_FEEDER now serves downstream replicas from \
2241                             its own WAL via the SAME FeederRunner + \
2242                             EnvironmentLogScanner mechanism the master uses \
2243                             (JE Feeder + MasterFeederSource)",
2244                            self.config.node_name.as_str(),
2245                        );
2246                    } else {
2247                        log::warn!(
2248                            "Node '{}': cascade_feeding set but no TCP \
2249                             dispatcher; downstream replicas cannot connect",
2250                            self.config.node_name.as_str(),
2251                        );
2252                    }
2253                }
2254
2255                // Resolve the master's socket address from the GroupService.
2256                let master_addr_opt: Option<SocketAddr> = self
2257                    .group_service
2258                    .get_all_nodes()
2259                    .iter()
2260                    .find(|n| n.name == master_name)
2261                    .and_then(|info| {
2262                        format!("{}:{}", info.host, info.port)
2263                            .parse::<SocketAddr>()
2264                            .ok()
2265                    });
2266
2267                let node_name = self.config.node_name.clone();
2268                let master = master_name.to_string();
2269                let vlsn_index_clone = Arc::clone(&vlsn_index);
2270                // Live shutdown flag (shared Arc): the receive loop polls it
2271                // so `close()` can break the blocking upstream receive and
2272                // join this thread — vital for a mid-tier replica in a chain
2273                // that is closed before its upstream feeder.
2274                let shutdown = Arc::clone(&self.io_shutdown);
2275                // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
2276                // can call `bootstrap_via_dispatcher` automatically when
2277                // the master signals `NeedsRestore`.  When the env was
2278                // never registered with `init_self_weak` (raw
2279                // `Arc::new(Self::new(...))` without going through
2280                // `open()` or the test harness), the weak ref is `None`
2281                // and we fall back to operator-driven bootstrap.
2282                let self_weak: Option<Weak<Self>> =
2283                    self.self_weak.get().cloned();
2284
2285                // REP-7 (B): clone the live EnvironmentImpl into the replica
2286                // thread so the writer can drive a ReplicaReplay that applies
2287                // each streamed entry to the live in-memory tree.
2288                let env_for_replay = Arc::clone(&env);
2289
2290                // REP-10 (C): build the ReplicaReplay HERE (not inside the
2291                // closure) so we can publish its REP-7 `last_applied_vlsn`
2292                // handle to a ConsistencyTracker BEFORE the thread starts
2293                // streaming.  A read on this replica then waits on the same
2294                // handle the replay thread advances.  Port of
2295                // RepImpl.getConsistency / Replica.getConsistencyTracker.
2296                let replay = noxu_dbi::ReplicaReplay::new(env_for_replay);
2297                let tracker = crate::ConsistencyTracker::new(
2298                    replay.last_applied_vlsn_handle(),
2299                );
2300                *self.consistency_tracker.lock().unwrap() = Some(tracker);
2301
2302                let handle = std::thread::Builder::new()
2303                    .name(format!("noxu-replica-{}", node_name))
2304                    .spawn(move || {
2305                        // REP-7 (B): wire the live replay-apply path so reads
2306                        // on the replica see replicated data without a
2307                        // restart.  JE: the replica writes each entry to its
2308                        // log, then Replay.replayEntry applies it to the tree.
2309                        let mut writer = EnvironmentLogWriter::with_replay(
2310                            log_mgr,
2311                            vlsn_index_clone,
2312                            replay,
2313                        );
2314
2315                        let Some(addr) = master_addr_opt else {
2316                            log::warn!(
2317                                "noxu-replica-{}: master '{}' address not in RepGroup; \
2318                                 waiting for TCP dispatcher connection",
2319                                node_name, master,
2320                            );
2321                            return;
2322                        };
2323
2324                        // Catch-up loop: catch up, observe NeedsRestore,
2325                        // optionally auto-bootstrap, retry once.  We cap
2326                        // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
2327                        // (small) so a misbehaving master does not loop
2328                        // forever consuming network bandwidth.
2329                        const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
2330                        let mut attempts: u32 = 0;
2331                        loop {
2332                            // Observe close before (re)connecting so a
2333                            // shutdown between catch-up attempts exits
2334                            // promptly.
2335                            if shutdown.load(Ordering::SeqCst) {
2336                                return;
2337                            }
2338                            log::info!(
2339                                "noxu-replica-{}: connecting to master '{}' at {}",
2340                                node_name, master, addr,
2341                            );
2342                            match crate::stream::peer_feeder::catch_up_from_peer_until(
2343                                addr, 0, &mut writer, &shutdown,
2344                            ) {
2345                                Ok(true) => {
2346                                    log::info!(
2347                                        "noxu-replica-{}: catch-up complete from '{}'",
2348                                        node_name, master,
2349                                    );
2350                                    return;
2351                                }
2352                                Ok(false) => {
2353                                    // F2/F4: master signals NeedsRestore.
2354                                    // Wave 9-A fix 2: if a Weak<Self> was
2355                                    // plumbed in, upgrade it and call
2356                                    // `bootstrap_via_dispatcher` ourselves
2357                                    // so the replica auto-bootstraps and
2358                                    // resumes catch-up without operator
2359                                    // intervention.
2360                                    log::warn!(
2361                                        "noxu-replica-{}: master '{}' requires restore",
2362                                        node_name, master,
2363                                    );
2364                                    attempts += 1;
2365                                    if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
2366                                        log::error!(
2367                                            "noxu-replica-{}: exceeded \
2368                                             auto-bootstrap attempts ({}); giving up",
2369                                            node_name,
2370                                            MAX_AUTO_BOOTSTRAP_ATTEMPTS,
2371                                        );
2372                                        return;
2373                                    }
2374                                    let env_arc = match self_weak
2375                                        .as_ref()
2376                                        .and_then(Weak::upgrade)
2377                                    {
2378                                        Some(e) => e,
2379                                        None => {
2380                                            // No back-ref or env dropped:
2381                                            // fall back to operator-driven
2382                                            // bootstrap and exit cleanly.
2383                                            log::warn!(
2384                                                "noxu-replica-{}: no back-reference \
2385                                                 available; operator must call \
2386                                                 bootstrap_via_dispatcher manually",
2387                                                node_name,
2388                                            );
2389                                            return;
2390                                        }
2391                                    };
2392                                    if env_arc.is_shutdown() {
2393                                        return;
2394                                    }
2395                                    log::info!(
2396                                        "noxu-replica-{}: auto-bootstrapping via \
2397                                         dispatcher from '{}' (attempt {})",
2398                                        node_name, master, attempts,
2399                                    );
2400                                    match env_arc
2401                                        .bootstrap_via_dispatcher(&master)
2402                                    {
2403                                        Ok(()) => {
2404                                            log::info!(
2405                                                "noxu-replica-{}: auto-bootstrap \
2406                                                 succeeded; resuming catch-up",
2407                                                node_name,
2408                                            );
2409                                            // Drop the strong ref before
2410                                            // re-entering catch-up so we
2411                                            // do not keep the env alive
2412                                            // longer than necessary.
2413                                            drop(env_arc);
2414                                            continue;
2415                                        }
2416                                        Err(e) => {
2417                                            log::error!(
2418                                                "noxu-replica-{}: auto-bootstrap \
2419                                                 failed: {}",
2420                                                node_name, e,
2421                                            );
2422                                            return;
2423                                        }
2424                                    }
2425                                }
2426                                Err(e) => {
2427                                    if !shutdown.load(Ordering::SeqCst) {
2428                                        log::error!(
2429                                            "noxu-replica-{}: error from master '{}': {e}",
2430                                            node_name, master,
2431                                        );
2432                                    }
2433                                    return;
2434                                }
2435                            }
2436                        }
2437                    })
2438                    .expect("failed to spawn noxu-replica thread");
2439
2440                self.io_threads.lock().unwrap().push(handle);
2441
2442                log::debug!(
2443                    "Node '{}': replica receive thread started for master '{}'",
2444                    self.config.node_name.as_str(),
2445                    master_name,
2446                );
2447            } else {
2448                log::warn!(
2449                    "Node '{}': no LogManager available (read-only env?); \
2450                     replica I/O loop not started",
2451                    self.config.node_name.as_str(),
2452                );
2453            }
2454        }
2455        // -------------------------------------------------------------------
2456
2457        // Notify listeners
2458        self.notify_listeners(old_state, NodeState::Replica);
2459
2460        log::info!(
2461            "Node '{}' became replica of master '{}'",
2462            self.config.node_name.as_str(),
2463            master_name
2464        );
2465        Ok(())
2466    }
2467
2468    /// Initiate a master transfer to the target node.
2469    ///
2470    ///
2471    ///
2472    /// Transfers the current master state from this node to one of the
2473    /// electable replicas. The replica that is actually chosen to be the new
2474    /// master is the one with which the Master Transfer can be completed most
2475    /// rapidly. The transfer operation ensures that all changes at this node
2476    /// are available at the new master upon conclusion of the operation.
2477    pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
2478        if self.is_shutdown() {
2479            return Err(RepError::StateError(
2480                "Cannot transfer master: environment is closed".to_string(),
2481            ));
2482        }
2483
2484        if !self.is_master() {
2485            return Err(RepError::InvalidState(
2486                "Master transfer can only be initiated on the master node"
2487                    .to_string(),
2488            ));
2489        }
2490
2491        log::info!(
2492            "Node '{}' initiating master transfer to '{}'",
2493            self.config.node_name.as_str(),
2494            config.target_node,
2495        );
2496
2497        // Closes finding F7 of the 2026 review.
2498        //
2499        // Steps:
2500        //   1. Locate the target's address.
2501        //   2. Compute the new term (current observed term + 1).
2502        //   3. Send TRANSFER_MASTER to the target — it will become master.
2503        //   4. Send TRANSFER_MASTER (with the same term + new master name) to
2504        //      every other peer so they re-target.
2505        //   5. Demote self to Replica of the target.
2506        //
2507        // The transfer is best-effort: a peer that doesn't ack is logged
2508        // and skipped.  The election driver will reconcile any divergence
2509        // on the next election round.
2510
2511        let target_addr = self
2512            .group_service
2513            .get_all_nodes()
2514            .into_iter()
2515            .find(|n| n.name == config.target_node)
2516            .and_then(|n| {
2517                format!("{}:{}", n.host, n.port)
2518                    .parse::<std::net::SocketAddr>()
2519                    .ok()
2520            })
2521            .ok_or_else(|| {
2522                RepError::ConfigError(format!(
2523                    "transfer_master: target '{}' not registered or has bad address",
2524                    config.target_node
2525                ))
2526            })?;
2527
2528        let new_term = self.master_tracker.get_term().saturating_add(1);
2529
2530        // 1. Tell the target to become master at the new term.
2531        let target_ack = crate::group_admin::send_transfer_master(
2532            target_addr,
2533            &config.target_node,
2534            new_term,
2535        )
2536        .map_err(|e| {
2537            RepError::NetworkError(format!(
2538                "transfer_master: failed to signal target '{}': {}",
2539                config.target_node, e
2540            ))
2541        })?;
2542        if !target_ack {
2543            return Err(RepError::StateError(format!(
2544                "transfer_master: target '{}' rejected the transfer",
2545                config.target_node
2546            )));
2547        }
2548
2549        // 2. Inform all other peers (best-effort).
2550        for peer in self.group_service.get_all_nodes() {
2551            if peer.name == self.config.node_name
2552                || peer.name == config.target_node
2553            {
2554                continue;
2555            }
2556            if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
2557                let _ = crate::group_admin::send_transfer_master(
2558                    addr,
2559                    &config.target_node,
2560                    new_term,
2561                );
2562            }
2563        }
2564
2565        // 3. Demote self to Replica of the new master.
2566        self.become_replica(&config.target_node)?;
2567
2568        log::info!(
2569            "Node '{}' transferred master to '{}' at term {}",
2570            self.config.node_name.as_str(),
2571            config.target_node,
2572            new_term,
2573        );
2574        Ok(())
2575    }
2576
2577    /// Register a VLSN (as master, after writing a log entry).
2578    ///
2579    /// Maps the given VLSN to the specified log file position. This is called
2580    /// by the master after it writes a replicated log entry.
2581    pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
2582        self.vlsn_index.register(vlsn, file_number, file_offset);
2583    }
2584
2585    /// Register a VLSN→LSN mapping with its `LogEntryType`, so `lastSync` /
2586    /// `lastTxnEnd` advance (JE `VLSNRange.getUpdateForNewMapping`). Used by
2587    /// the syncup driver/tests that apply VLSN-tagged entries to a real log
2588    /// and need the sync/commit boundaries to track the stream.
2589    pub fn register_vlsn_typed(
2590        &self,
2591        vlsn: u64,
2592        file_number: u32,
2593        file_offset: u32,
2594        entry_type: noxu_log::LogEntryType,
2595    ) {
2596        self.vlsn_index.register_with_type(
2597            vlsn,
2598            file_number,
2599            file_offset,
2600            entry_type,
2601        );
2602    }
2603
2604    /// Replicate a freshly committed log entry from the master.
2605    ///
2606    /// Closes finding F9 of the 2026 review.
2607    ///
2608    /// Combines `register_vlsn` with a push into the in-memory
2609    /// `peer_scanner` so that downstream replicas pulling from this
2610    /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
2611    /// stream the entry without round-tripping through the on-disk
2612    /// log.  The local log is still the source of truth; the peer
2613    /// scanner is a fast-path cache that bounds itself via
2614    /// `PeerLogScanner::with_capacity` so old entries are evicted.
2615    ///
2616    /// Should be called by the master after the local commit has
2617    /// fsynced.  Calling on a non-master is harmless (the peer
2618    /// scanner cache is also used by replicas) but is logged at trace
2619    /// level for diagnostics.
2620    pub fn replicate_entry(
2621        &self,
2622        vlsn: u64,
2623        file_number: u32,
2624        file_offset: u32,
2625        entry_type: u8,
2626        data: Vec<u8>,
2627    ) {
2628        // Register VLSN -> LSN, dispatching entry type so lastSync /
2629        // lastTxnEnd advance (REP-5; JE VLSNRange.getUpdateForNewMapping).
2630        // An unknown type byte falls back to extend-only registration.
2631        match noxu_log::LogEntryType::from_type_num(entry_type) {
2632            Some(et) => self.vlsn_index.register_with_type(
2633                vlsn,
2634                file_number,
2635                file_offset,
2636                et,
2637            ),
2638            None => self.vlsn_index.register(vlsn, file_number, file_offset),
2639        }
2640        // Pull path: shared peer_scanner serves replicas connecting via
2641        // PeerFeederService (catch_up_from_peer).
2642        self.peer_scanner.push(vlsn, entry_type, data.clone());
2643        // Push path (C-C2): fan out to per-replica FeederRunner queues so
2644        // that threads spawned by become_master can stream entries to each
2645        // registered replica without competing with PeerFeederService.
2646        {
2647            let queues = self.feeder_queues.read().unwrap();
2648            for queue in queues.values() {
2649                queue.push(vlsn, entry_type, data.clone());
2650            }
2651        }
2652        if !self.is_master() {
2653            log::trace!(
2654                "replicate_entry called on non-master node '{}': vlsn={}, type={}",
2655                self.config.node_name,
2656                vlsn,
2657                entry_type,
2658            );
2659        }
2660    }
2661
2662    /// Apply a replicated entry (as replica).
2663    ///
2664    /// Applies a log entry received from the master. This is called by the
2665    /// replica stream handler after receiving an entry from the feeder.
2666    ///
2667    /// `data` is the wire-encoded log-record payload.  When the
2668    /// replicated environment has not been wired to a local
2669    /// `noxu_db::Environment` (i.e., before `with_environment` is
2670    /// called) the payload is forwarded into the in-memory peer
2671    /// scanner so that downstream replicas attached to the
2672    /// `PEER_FEEDER` service can re-stream it; the local log is **not**
2673    /// updated.  This is documented behaviour rather than a stub — see
2674    /// the 2026 review finding #26 (medium) for the
2675    /// `with_environment`-required local-apply path.
2676    /// cleanup (rep info F35: `_data` placeholder) renames the leading
2677    /// underscore so reviewers don't read it as a TODO.
2678    pub fn apply_entry(
2679        &self,
2680        vlsn: u64,
2681        entry_type: u8,
2682        data: Vec<u8>,
2683    ) -> Result<()> {
2684        if self.is_shutdown() {
2685            return Err(RepError::StateError(
2686                "Cannot apply entry: environment is closed".to_string(),
2687            ));
2688        }
2689
2690        // Register the VLSN in the index, dispatching entry type so
2691        // lastSync/lastTxnEnd advance (REP-5; JE
2692        // VLSNRange.getUpdateForNewMapping).
2693        match noxu_log::LogEntryType::from_type_num(entry_type) {
2694            Some(et) => self.vlsn_index.register_with_type(vlsn, 0, 0, et),
2695            None => self.vlsn_index.register(vlsn, 0, 0),
2696        }
2697
2698        // Push into the peer log scanner so downstream replicas can
2699        // receive this entry via the PEER_FEEDER service.
2700        self.peer_scanner.push(vlsn, entry_type, data.clone());
2701        // C-C2 push path: fan out to per-replica FeederRunner queues.
2702        {
2703            let queues = self.feeder_queues.read().unwrap();
2704            for queue in queues.values() {
2705                queue.push(vlsn, entry_type, data.clone());
2706            }
2707        }
2708
2709        log::trace!(
2710            "Applied replicated entry: vlsn={}, type={}",
2711            vlsn,
2712            entry_type
2713        );
2714        Ok(())
2715    }
2716
2717    /// Record an ack from a replica (as master).
2718    ///
2719    /// Records that the specified replica has acknowledged processing up to
2720    /// the given VLSN. This is used by the master to track durability
2721    /// guarantees.
2722    pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
2723        // Only acks from ELECTABLE replicas count toward the durability
2724        // quorum (JE DurabilityQuorum.replicaAcksQualify: Monitors and
2725        // Secondaries do not qualify). An ack from a non-electable / unknown
2726        // node is recorded for stats elsewhere but must not satisfy the
2727        // ReplicaAckPolicy. If the node is unknown to the group view we err
2728        // toward NOT counting it.
2729        let qualifies = self
2730            .get_rep_group()
2731            .get_node(replica_name)
2732            .map(|n| n.node_type().is_electable())
2733            .unwrap_or(false);
2734        if qualifies {
2735            self.ack_tracker.record_ack(vlsn, replica_name);
2736        }
2737        // REP-9 Part 1: advance the matching `Feeder::acked_vlsn` high-water
2738        // mark (read by `update_dtvlsn_from_feeders` and exposed via
2739        // `get_acked_vlsn`).  The production `FeederRunner` previously updated
2740        // only its private `known_replica_vlsn`, so the DTVLSN ranking never
2741        // saw production progress (JE `Feeder.getReplicaTxnEndVLSN`).  We
2742        // record the high-water for *any* replica (electable or not); the
2743        // electable filter is reapplied when DTVLSN/quorum is computed.
2744        for feeder in self.feeders.read().iter() {
2745            if feeder.get_replica_name() == replica_name {
2746                feeder.record_ack(vlsn);
2747                break;
2748            }
2749        }
2750        // Recompute the DTVLSN from feeder progress whenever an ack lands.
2751        self.update_dtvlsn_from_feeders();
2752        // REP-9: wake any committer parked in `await_replica_acks`. Its
2753        // satisfaction predicate is the high-water feeder count, not an
2754        // exact-VLSN registration, so we must notify unconditionally (the
2755        // AckTracker's own `record_ack` only notifies when the exact VLSN was
2756        // registered, which the per-frame feeder acks generally are not).
2757        self.ack_tracker.notify_waiters();
2758    }
2759
2760    /// Returns the current Durable Transaction VLSN (D7, JE RepNode.getDTVLSN).
2761    /// The highest VLSN replicated to a majority of electable replicas; 0 if
2762    /// none yet. Used by the election ranking so the most-durable node wins.
2763    pub fn get_dtvlsn(&self) -> u64 {
2764        self.dtvlsn.load(std::sync::atomic::Ordering::Acquire)
2765    }
2766
2767    /// Advance the DTVLSN to `candidate` if it is greater (JE
2768    /// RepNode.updateDTVLSN — an `AtomicLongMax.updateMax`). The DTVLSN can
2769    /// only move forward. Returns the resulting (possibly unchanged) value.
2770    pub fn update_dtvlsn(&self, candidate: u64) -> u64 {
2771        use std::sync::atomic::Ordering;
2772        let mut cur = self.dtvlsn.load(Ordering::Acquire);
2773        while candidate > cur {
2774            match self.dtvlsn.compare_exchange_weak(
2775                cur,
2776                candidate,
2777                Ordering::AcqRel,
2778                Ordering::Acquire,
2779            ) {
2780                Ok(_) => return candidate,
2781                Err(observed) => cur = observed,
2782            }
2783        }
2784        cur
2785    }
2786
2787    /// Set the DTVLSN from the replication stream (JE RepNode.setDTVLSN —
2788    /// used exclusively by the replica, which maintains the DTVLSN from
2789    /// commit/abort records). Still enforced as advance-only via update_max so
2790    /// an out-of-order or stale record cannot move it backward.
2791    pub fn set_dtvlsn(&self, vlsn: u64) {
2792        self.update_dtvlsn(vlsn);
2793    }
2794
2795    /// Master-side DTVLSN computation (D7, JE FeederManager.updateDTVLSN):
2796    /// across the *qualifying* (electable) feeders whose replica-txn-end VLSN
2797    /// exceeds the current DTVLSN, take the minimum; once a SIMPLE_MAJORITY
2798    /// ack-count of them exceeds the current value, advance the DTVLSN to that
2799    /// minimum (a transaction is durable once a majority hold it).
2800    fn update_dtvlsn_from_feeders(&self) {
2801        if !self.is_master() {
2802            return;
2803        }
2804        let curr = self.get_dtvlsn();
2805
2806        // SIMPLE_MAJORITY required-ack-count over the electable group,
2807        // computed the same way as await_replica_acks.
2808        let group = self.get_rep_group();
2809        let electable_peers: u32 = group
2810            .get_nodes()
2811            .iter()
2812            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2813            .count() as u32;
2814        let electable_count = electable_peers + 1; // +1 for self/master
2815        // required electable acks for SIMPLE_MAJORITY = floor(n/2) replicas
2816        // (the master self-acks; a majority is reached when this many peers
2817        // also hold the VLSN).
2818        let durable_ack_count = electable_count / 2;
2819        if durable_ack_count == 0 {
2820            // Single-node (or majority is self alone): the master's own log is
2821            // immediately durable up to its latest VLSN.
2822            self.update_dtvlsn(self.get_current_vlsn());
2823            return;
2824        }
2825
2826        let mut min = u64::MAX;
2827        let mut ack_count: u32 = 0;
2828        for feeder in self.feeders.read().iter() {
2829            // replicaAcksQualify: only electable feeders count (D6).
2830            let qualifies = group
2831                .get_node(&feeder.get_replica_name())
2832                .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2833                .unwrap_or(false);
2834            if !qualifies {
2835                continue;
2836            }
2837            let replica_vlsn = feeder.get_acked_vlsn();
2838            if replica_vlsn <= curr {
2839                continue;
2840            }
2841            if replica_vlsn < min {
2842                min = replica_vlsn;
2843            }
2844            ack_count += 1;
2845            if ack_count >= durable_ack_count {
2846                // A majority of electable replicas hold >= min: durable.
2847                self.update_dtvlsn(min);
2848                return;
2849            }
2850        }
2851        // DTVLSN unchanged.
2852    }
2853
2854    /// REP-9: count qualifying (electable) feeders whose acked high-water VLSN
2855    /// is `>= commit_vlsn`.  This is the Rust equivalent of JE
2856    /// `FeederManager.getNumCurrentAckFeeders(commitVLSN)` — the durability
2857    /// quorum is satisfied when this count reaches the required ack count.
2858    /// Only Electable replicas qualify (D6, JE
2859    /// `DurabilityQuorum.replicaAcksQualify`).
2860    fn count_ack_feeders_ge(&self, commit_vlsn: u64) -> u32 {
2861        let group = self.get_rep_group();
2862        let mut count = 0u32;
2863        for feeder in self.feeders.read().iter() {
2864            let qualifies = group
2865                .get_node(&feeder.get_replica_name())
2866                .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2867                .unwrap_or(false);
2868            // A feeder counts only if it has acked a *real* VLSN at or above
2869            // the commit VLSN.  `acked_vlsn == 0` is the NULL sentinel (no ack
2870            // yet) and must never satisfy a commit, even when `commit_vlsn`
2871            // itself is 0 (no replicated commit logged) — mirrors JE
2872            // `getReplicaTxnEndVLSN()` returning NULL_VLSN for a fresh feeder,
2873            // which is not `>=` any commit VLSN.
2874            let acked = feeder.get_acked_vlsn();
2875            if qualifies && acked > 0 && acked >= commit_vlsn {
2876                count += 1;
2877            }
2878        }
2879        count
2880    }
2881
2882    /// Set the state change listener.
2883    ///
2884    ///
2885    ///
2886    /// Sets the listener used to receive asynchronous replication node state
2887    /// change events. Note that there is one listener per replication node,
2888    /// not one per handle. Invoking this method adds to the set of listeners.
2889    ///
2890    /// Invoking this method typically results in an immediate callback to the
2891    /// application via the `on_state_change` method, so that the application
2892    /// is made aware of the existing state of the node at the time the listener
2893    /// is first established.
2894    pub fn set_state_change_listener(
2895        &self,
2896        listener: Arc<dyn StateChangeListener>,
2897    ) {
2898        // Immediately notify the listener of the current state
2899        let current_state = self.node_state.get_state();
2900        let event = StateChangeEvent::new(
2901            current_state,
2902            current_state,
2903            self.get_master_name(),
2904        );
2905        listener.on_state_change(event);
2906
2907        let mut listeners = self.listeners.write();
2908        listeners.push(listener);
2909    }
2910
2911    /// Close the replicated environment.
2912    ///
2913    ///
2914    ///
2915    /// Closes this handle and releases any resources. When closed, daemon
2916    /// threads are stopped, even if they are performing work. The node ceases
2917    /// participation in the replication group. If the node was currently the
2918    /// master, the rest of the group will hold an election.
2919    ///
2920    /// The ReplicatedEnvironment should not be closed while any other type of
2921    /// handle that refers to it is not yet closed.
2922    pub fn close(&self) -> Result<()> {
2923        if self.shutdown.swap(true, Ordering::SeqCst) {
2924            // Already closed
2925            return Ok(());
2926        }
2927
2928        let old_state = self.node_state.get_state();
2929
2930        // Transition to Shutdown state. The state machine allows this from
2931        // any non-Shutdown state.
2932        let _ = self.node_state.transition_to(NodeState::Shutdown);
2933
2934        // Notify listeners of the shutdown
2935        self.notify_listeners(old_state, NodeState::Shutdown);
2936
2937        // Clear feeders
2938        {
2939            let mut feeders = self.feeders.write();
2940            feeders.clear();
2941        }
2942
2943        // C-C2: close all registered feeder channels so FeederRunner threads
2944        // observe ChannelClosed and exit their run() loops cleanly.
2945        {
2946            let channels = self.feeder_channels.lock().unwrap();
2947            for (name, ch) in channels.iter() {
2948                if let Err(e) = ch.close() {
2949                    log::debug!(
2950                        "close: feeder channel for '{}' already closed: {}",
2951                        name,
2952                        e
2953                    );
2954                }
2955            }
2956        }
2957        // Drop all active runners and queues so their Arcs release.
2958        self.active_feeder_runners.lock().unwrap().clear();
2959        self.feeder_queues.write().unwrap().clear();
2960
2961        // Signal and join all I/O threads spawned by become_master /
2962        // become_replica / start_vlsn_persistence_daemon.  The vlsn-flush
2963        // thread does a final flush on its way out so a clean close is
2964        // recoverable.  Closes finding F11.
2965        self.io_shutdown.store(true, Ordering::SeqCst);
2966        {
2967            let mut threads = self.io_threads.lock().unwrap();
2968            for handle in threads.drain(..) {
2969                let _ = handle.join();
2970            }
2971        }
2972
2973        // Belt-and-braces: even when no daemon is running (e.g.
2974        // `ReplicatedEnvironment::new` without `open`), persist a final
2975        // snapshot if env_home is configured.
2976        if let Some(ref home) = self.config.env_home
2977            && let Err(e) =
2978                crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
2979        {
2980            log::warn!(
2981                "close: failed to persist VLSN index to {}: {}",
2982                home.display(),
2983                e
2984            );
2985        }
2986
2987        // Stop the service dispatcher (the: serviceDispatcher.shutdown()).
2988        if let Some(ref dispatcher) = self.tcp_dispatcher {
2989            dispatcher.stop();
2990            let kind = if dispatcher.is_tls() { "TLS" } else { "TCP" };
2991            log::debug!(
2992                "Node '{}' {} service dispatcher stopped",
2993                self.config.node_name.as_str(),
2994                kind,
2995            );
2996        }
2997
2998        log::info!(
2999            "Replicated environment '{}' in group '{}' closed",
3000            self.config.node_name.as_str(),
3001            self.config.group_name.as_str()
3002        );
3003
3004        Ok(())
3005    }
3006
3007    /// Close this handle and shut down the Replication Group by forcing all
3008    /// active Replicas to exit.
3009    ///
3010    ///
3011    ///
3012    /// This method must be invoked on the node that's currently the Master
3013    /// after all other outstanding handles have been closed.
3014    ///
3015    /// When push-feeder threads are active (registered via
3016    /// [`Self::register_feeder_channel`]), the master first waits up to half
3017    /// of `replica_shutdown_timeout_ms` for each FeederRunner replica to
3018    /// acknowledge all outstanding log entries (VLSN catch-up).  Replicas
3019    /// that do not catch up within the budget receive a warning; the master
3020    /// proceeds to send `SHUTDOWN_GROUP` regardless.  This closes finding M-4
3021    /// of the v3.x production-readiness review.
3022    ///
3023    /// Replicas that are not fed via a registered channel (pull-based
3024    /// `PeerFeederService` path) are sent `SHUTDOWN_GROUP` without a
3025    /// VLSN-level catch-up wait — that wait requires per-replica ack tracking
3026    /// which the pull path does not yet provide.
3027    pub fn shutdown_group(
3028        &self,
3029        replica_shutdown_timeout_ms: u64,
3030    ) -> Result<()> {
3031        if !self.is_master() {
3032            return Err(RepError::InvalidState(
3033                "shutdownGroup must be invoked on the master".to_string(),
3034            ));
3035        }
3036
3037        log::info!(
3038            "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
3039            self.config.node_name.as_str(),
3040            self.config.group_name.as_str(),
3041            replica_shutdown_timeout_ms,
3042        );
3043
3044        // M-4: Wait for active FeederRunner replicas to ack the master's
3045        // current VLSN before sending SHUTDOWN_GROUP.  We allow up to half
3046        // the overall timeout for the catch-up phase so the second half
3047        // remains for the SHUTDOWN_GROUP send loop.
3048        let catchup_budget_ms = replica_shutdown_timeout_ms / 2;
3049        if catchup_budget_ms > 0 {
3050            let master_vlsn = self.vlsn_index.get_range().last();
3051            if master_vlsn > 0 {
3052                let runners: Vec<(String, Arc<FeederRunner>)> = self
3053                    .active_feeder_runners
3054                    .lock()
3055                    .unwrap()
3056                    .iter()
3057                    .map(|(k, v)| (k.clone(), Arc::clone(v)))
3058                    .collect();
3059                if !runners.is_empty() {
3060                    let catchup_deadline = std::time::Instant::now()
3061                        + Duration::from_millis(catchup_budget_ms);
3062                    for (name, runner) in &runners {
3063                        loop {
3064                            let acked = runner.known_replica_vlsn();
3065                            if acked >= master_vlsn
3066                                || std::time::Instant::now() >= catchup_deadline
3067                            {
3068                                if acked < master_vlsn {
3069                                    log::warn!(
3070                                        "shutdown_group: replica '{}' acked \
3071                                         VLSN {} < master VLSN {}; proceeding",
3072                                        name,
3073                                        acked,
3074                                        master_vlsn,
3075                                    );
3076                                } else {
3077                                    log::info!(
3078                                        "shutdown_group: replica '{}' caught up \
3079                                         to VLSN {}",
3080                                        name,
3081                                        acked,
3082                                    );
3083                                }
3084                                break;
3085                            }
3086                            std::thread::sleep(Duration::from_millis(10));
3087                        }
3088                    }
3089                }
3090            }
3091        }
3092
3093        // Closes finding F8 of the 2026 review.
3094        //
3095        // Send SHUTDOWN_GROUP to every known peer.  The recipient calls
3096        // its own `close()` and the per-connection ADMIN handler
3097        // returns ACK_OK.  Any peer that doesn't ack within the
3098        // timeout is logged and the master proceeds.  After signalling
3099        // every peer, the master closes its own env.
3100        let deadline = std::time::Instant::now()
3101            + Duration::from_millis(replica_shutdown_timeout_ms);
3102
3103        for peer in self.group_service.get_all_nodes() {
3104            if peer.name == self.config.node_name {
3105                continue;
3106            }
3107            // Don't exceed the deadline waiting for any single peer.
3108            let now = std::time::Instant::now();
3109            if now >= deadline {
3110                log::warn!(
3111                    "shutdown_group: deadline reached; skipping remaining peers"
3112                );
3113                break;
3114            }
3115            let addr_str = format!("{}:{}", peer.host, peer.port);
3116            let addr = match addr_str.parse::<SocketAddr>() {
3117                Ok(a) => a,
3118                Err(e) => {
3119                    log::warn!(
3120                        "shutdown_group: peer '{}' has bad address {}: {}",
3121                        peer.name,
3122                        addr_str,
3123                        e
3124                    );
3125                    continue;
3126                }
3127            };
3128            match crate::group_admin::send_shutdown_group(addr) {
3129                Ok(true) => log::info!(
3130                    "shutdown_group: peer '{}' acknowledged",
3131                    peer.name
3132                ),
3133                Ok(false) => log::warn!(
3134                    "shutdown_group: peer '{}' rejected the request",
3135                    peer.name
3136                ),
3137                Err(e) => log::warn!(
3138                    "shutdown_group: peer '{}' unreachable: {}",
3139                    peer.name,
3140                    e
3141                ),
3142            }
3143        }
3144
3145        // Master closes itself last.
3146        self.close()
3147    }
3148
3149    /// Check if shutdown is in progress.
3150    pub fn is_shutdown(&self) -> bool {
3151        self.shutdown.load(Ordering::SeqCst)
3152    }
3153
3154    /// Notify all registered listeners of a state change.
3155    fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
3156        let listeners = self.listeners.read();
3157        if !listeners.is_empty() {
3158            let event = StateChangeEvent::new(
3159                old_state,
3160                new_state,
3161                self.get_master_name(),
3162            );
3163            for listener in listeners.iter() {
3164                listener.on_state_change(event.clone());
3165            }
3166        }
3167    }
3168}
3169
3170// ---------------------------------------------------------------------------
3171// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
3172// ---------------------------------------------------------------------------
3173//
3174// `noxu_db::Transaction::commit_with_durability` calls
3175// `await_replica_acks` after the local WAL fsync.  This impl:
3176//
3177//   1. Rejects calls on a non-master node with `NotMaster`.
3178//   2. Rejects calls during shutdown with `Shutdown`.
3179//   3. Computes the required ack count from `electable_count` and the
3180//      requested policy.
3181//   4. Allocates a unique commit sequence number, registers the ack
3182//      requirement on the `AckTracker`, and polls `is_satisfied` with
3183//      a small sleep until either the timeout elapses or the policy
3184//      is satisfied.
3185//   5. Cleans up the tracker entry on every exit path.
3186//
3187// Closes finding F1 of the 2026 review.
3188impl ReplicaAckCoordinator for ReplicatedEnvironment {
3189    fn await_replica_acks(
3190        &self,
3191        policy: ReplicaAckPolicyKind,
3192        timeout: Duration,
3193    ) -> std::result::Result<u32, AckWaitError> {
3194        // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
3195        // says callers may already short-circuit, but be defensive.
3196        if matches!(policy, ReplicaAckPolicyKind::None) {
3197            return Ok(0);
3198        }
3199
3200        if self.is_shutdown() {
3201            return Err(AckWaitError {
3202                kind: AckWaitErrorKind::Shutdown,
3203                needed: 0,
3204                received: 0,
3205            });
3206        }
3207
3208        if !self.is_master() {
3209            return Err(AckWaitError {
3210                kind: AckWaitErrorKind::NotMaster,
3211                needed: 0,
3212                received: 0,
3213            });
3214        }
3215
3216        // Count electable peers (excluding the master) using the
3217        // RepGroup view, which counts Arbiters and Electables
3218        // identically. Only Electable nodes are counted as data
3219        // replicas able to ack a commit.  The master itself is
3220        // *implicit*: it is not registered in `group_service` (only
3221        // peers are), so we add 1 to obtain the total electable
3222        // count expected by `ReplicaAckPolicyKind::required_acks`.
3223        let group = self.get_rep_group();
3224        let electable_peers: u32 = group
3225            .get_nodes()
3226            .iter()
3227            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
3228            .count() as u32;
3229        let electable_count: u32 = electable_peers + 1; // +1 for self/master
3230
3231        let needed = policy.required_acks(electable_count);
3232        if needed == 0 {
3233            // Single-node group, or All with only the master itself.
3234            return Ok(0);
3235        }
3236
3237        // REP-9 Part 2: the commit's VLSN is the key.  The master assigns a
3238        // VLSN when it logs the TxnCommit (via the shared `wal_vlsn_counter`
3239        // bumped in `EnvironmentImpl::log_txn_commit`), immediately before
3240        // this gate runs.  The latest assigned VLSN therefore IS this
3241        // commit's VLSN (the trait contract: "implementations are responsible
3242        // for assigning the commit VLSN internally").  We wait until a quorum
3243        // of qualifying electable replicas have acked a VLSN >= the commit
3244        // VLSN — faithful to JE `FeederManager.getNumCurrentAckFeeders`, which
3245        // counts feeders whose `getReplicaTxnEndVLSN() >= commitVLSN` (a
3246        // high-water `>=` test, NOT an exact-VLSN match).
3247        //
3248        // ponytail: reads the global high-water VLSN, so a concurrent later
3249        // commit can make this gate wait on a slightly higher VLSN than its
3250        // own. That is strictly SAFE (waiting for >= a newer VLSN never
3251        // returns early) and only marginally less precise; thread the
3252        // per-txn VLSN through the trait if exact per-commit granularity is
3253        // ever needed.
3254        let commit_vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
3255
3256        // Register on the AckTracker too: this is what `record_ack` notifies,
3257        // so the condvar wakes us as acks land.  The satisfaction decision
3258        // itself is the high-water feeder count below.
3259        self.ack_tracker.register(commit_vlsn, needed);
3260
3261        // Block on the ack condvar until a quorum of electable feeders hold
3262        // the commit VLSN, the timeout elapses, or shutdown is signalled — no
3263        // spin-poll (JE FeederTxns.TxnInfo uses a per-transaction
3264        // CountDownLatch.await; the AckTracker condvar is the shared-mutex
3265        // equivalent). record_ack notifies us as acks arrive.
3266        let satisfied = self.ack_tracker.wait_for_predicate(
3267            timeout,
3268            || self.count_ack_feeders_ge(commit_vlsn) >= needed,
3269            || self.is_shutdown(),
3270        );
3271        if satisfied {
3272            self.ack_tracker.cleanup_through(commit_vlsn);
3273            return Ok(needed);
3274        }
3275        if self.is_shutdown() {
3276            self.ack_tracker.cleanup_through(commit_vlsn);
3277            return Err(AckWaitError {
3278                kind: AckWaitErrorKind::Shutdown,
3279                needed,
3280                received: 0,
3281            });
3282        }
3283        // Timed out: report the partial ack count (qualifying electable
3284        // feeders holding the commit VLSN) so the caller can surface
3285        // InsufficientReplicas.
3286        let received = self.count_ack_feeders_ge(commit_vlsn);
3287        self.ack_tracker.cleanup_through(commit_vlsn);
3288        Err(AckWaitError { kind: AckWaitErrorKind::Timeout, needed, received })
3289    }
3290
3291    /// X-3: allocate the next VLSN for a recovered XA commit and register
3292    /// `lsn` in the VLSN index so feeders can stream the commit.
3293    ///
3294    /// Increments off the current latest VLSN so the new VLSN is strictly
3295    /// monotonically increasing.  In a single-node or master-less environment
3296    /// (not master) returns 0 (NULL_VLSN — harmless, the default).
3297    fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
3298        // Only allocate a VLSN when we are the master; on a replica the
3299        // recovered XA should have been replicated by the original master.
3300        if !self.is_master() {
3301            return 0;
3302        }
3303        let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
3304        // A recovered XA commit is a commit log entry; dispatch as TxnCommit
3305        // so lastTxnEnd/lastSync advance (REP-5).
3306        self.vlsn_index.register_with_type(
3307            next_vlsn,
3308            lsn.file_number(),
3309            lsn.file_offset(),
3310            noxu_log::LogEntryType::TxnCommit,
3311        );
3312        log::debug!(
3313            "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
3314            next_vlsn,
3315            lsn
3316        );
3317        next_vlsn
3318    }
3319
3320    /// R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.
3321    ///
3322    /// The caller writes the `TxnCommit` WAL entry with this VLSN embedded,
3323    /// then calls `register_recovered_commit_vlsn` with the actual commit LSN.
3324    /// This two-step approach ensures the WAL entry carries the VLSN so the
3325    /// X-14 VLSN rebuild on second crash can find it.
3326    fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
3327        if !self.is_master() {
3328            return 0;
3329        }
3330        // Peek at the next VLSN without registering.  The actual registration
3331        // happens in register_recovered_commit_vlsn() after the WAL write.
3332        self.vlsn_index.get_latest_vlsn() + 1
3333    }
3334
3335    /// R-3: register a pre-allocated VLSN in the VLSN index with the actual
3336    /// commit LSN.  Called after writing the `TxnCommit` WAL entry.
3337    fn register_recovered_commit_vlsn(
3338        &self,
3339        vlsn: u64,
3340        commit_lsn: noxu_util::Lsn,
3341    ) {
3342        if vlsn == 0 || !self.is_master() {
3343            return;
3344        }
3345        // The pre-allocated VLSN is for a TxnCommit WAL entry; dispatch the
3346        // type so lastTxnEnd/lastSync advance (REP-5).
3347        self.vlsn_index.register_with_type(
3348            vlsn,
3349            commit_lsn.file_number(),
3350            commit_lsn.file_offset(),
3351            noxu_log::LogEntryType::TxnCommit,
3352        );
3353        log::debug!(
3354            "register_recovered_commit_vlsn: registered vlsn={} for commit_lsn={:?}",
3355            vlsn,
3356            commit_lsn
3357        );
3358    }
3359}
3360
3361#[cfg(test)]
3362mod tests {
3363    use super::*;
3364    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
3365
3366    /// Helper to create a test config with a fixed port (unit-test style,
3367    /// no real TCP bind needed — hostname "localhost" resolves but the port
3368    /// might be in use; use `test_config_port0` for real TCP tests).
3369    fn test_config(node_name: &str) -> RepConfig {
3370        RepConfig::builder("test_group", node_name, "localhost")
3371            .node_port(5001)
3372            .build()
3373    }
3374
3375    /// Helper to create a test config that binds to an OS-assigned port.
3376    fn test_config_port0(node_name: &str) -> RepConfig {
3377        RepConfig::builder("test_group", node_name, "127.0.0.1")
3378            .node_port(0)
3379            .build()
3380    }
3381
3382    #[test]
3383    fn test_initial_state_is_detached() {
3384        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3385        // NodeStateMachine starts in Detached state
3386        assert_eq!(env.get_state(), NodeState::Detached);
3387        assert!(!env.is_master());
3388        assert!(!env.is_replica());
3389        assert!(!env.is_active());
3390    }
3391
3392    #[test]
3393    fn test_become_master() {
3394        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3395        env.become_master(1).unwrap();
3396        assert_eq!(env.get_state(), NodeState::Master);
3397        assert!(env.is_master());
3398        assert!(!env.is_replica());
3399        assert!(env.is_active());
3400    }
3401
3402    #[test]
3403    fn test_become_replica() {
3404        let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
3405        env.become_replica("node1").unwrap();
3406        assert_eq!(env.get_state(), NodeState::Replica);
3407        assert!(!env.is_master());
3408        assert!(env.is_replica());
3409        assert!(env.is_active());
3410    }
3411
3412    #[test]
3413    fn test_get_node_name() {
3414        let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
3415        assert_eq!(env.get_node_name(), "my_node");
3416    }
3417
3418    #[test]
3419    fn test_get_group_name() {
3420        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3421        assert_eq!(env.get_group_name(), "test_group");
3422    }
3423
3424    #[test]
3425    fn test_register_vlsn_updates_index() {
3426        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3427        env.register_vlsn(1, 0, 100);
3428        env.register_vlsn(2, 0, 200);
3429        env.register_vlsn(3, 0, 300);
3430
3431        assert_eq!(env.get_current_vlsn(), 3);
3432        let range = env.get_vlsn_range();
3433        assert_eq!(range.first(), 1);
3434        assert_eq!(range.last(), 3);
3435    }
3436
3437    #[test]
3438    fn test_record_ack() {
3439        use crate::node_type::NodeType;
3440        use crate::rep_node::RepNode;
3441        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3442        env.become_master(1).unwrap();
3443        // replicaAcksQualify: only ELECTABLE replicas count toward durability,
3444        // so the replica must be a known electable member of the group.
3445        env.add_peer(RepNode::new(
3446            "replica1".to_string(),
3447            NodeType::Electable,
3448            "127.0.0.1".to_string(),
3449            6001,
3450            2,
3451        ))
3452        .unwrap();
3453
3454        env.register_vlsn(1, 0, 100);
3455        // Register a pending ack requirement, then record ack
3456        env.get_ack_tracker().register(1, 1);
3457        env.record_ack(1, "replica1");
3458        // Ack should be satisfied
3459        assert!(env.get_ack_tracker().is_satisfied(1));
3460    }
3461
3462    #[test]
3463    fn test_record_ack_from_non_electable_does_not_qualify() {
3464        use crate::node_type::NodeType;
3465        use crate::rep_node::RepNode;
3466        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3467        env.become_master(1).unwrap();
3468        // A Monitor is NOT electable -> its ack must not count (JE
3469        // DurabilityQuorum.replicaAcksQualify).
3470        env.add_peer(RepNode::new(
3471            "monitor1".to_string(),
3472            NodeType::Monitor,
3473            "127.0.0.1".to_string(),
3474            6002,
3475            3,
3476        ))
3477        .unwrap();
3478        env.register_vlsn(1, 0, 100);
3479        env.get_ack_tracker().register(1, 1);
3480        env.record_ack(1, "monitor1");
3481        assert!(
3482            !env.get_ack_tracker().is_satisfied(1),
3483            "non-electable ack must not satisfy durability quorum"
3484        );
3485        // An unknown replica likewise does not qualify.
3486        env.record_ack(1, "ghost");
3487        assert!(!env.get_ack_tracker().is_satisfied(1));
3488    }
3489
3490    #[test]
3491    fn test_authoritative_quorum_met() {
3492        // 1-node group (electable_total=1): master alone IS authoritative
3493        // (quorum_size = 1/2+1 = 1; 0 replicas + 1 >= 1).
3494        assert!(ReplicatedEnvironment::authoritative_quorum_met(0, 1));
3495        // 3-node group (electable_total=3, quorum_size=2): master with 0
3496        // connected replicas is the minority -> NOT authoritative.
3497        assert!(!ReplicatedEnvironment::authoritative_quorum_met(0, 3));
3498        // 3-node group with 1 connected electable replica -> 1+1=2 >= 2 -> yes.
3499        assert!(ReplicatedEnvironment::authoritative_quorum_met(1, 3));
3500        // 5-node group (quorum_size=3): need 2 connected replicas.
3501        assert!(!ReplicatedEnvironment::authoritative_quorum_met(1, 5));
3502        assert!(ReplicatedEnvironment::authoritative_quorum_met(2, 5));
3503    }
3504
3505    #[test]
3506    fn test_is_authoritative_master_requires_master_role() {
3507        // A non-master is never authoritative regardless of connections.
3508        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3509        assert!(!env.is_master());
3510        assert!(!env.is_authoritative_master());
3511        // A single-node master (no peers) IS authoritative.
3512        env.become_master(1).unwrap();
3513        assert!(env.is_authoritative_master());
3514    }
3515
3516    #[test]
3517    fn test_dtvlsn_update_max_advances_only() {
3518        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3519        assert_eq!(env.get_dtvlsn(), 0);
3520        assert_eq!(env.update_dtvlsn(10), 10);
3521        assert_eq!(env.get_dtvlsn(), 10);
3522        // A lower candidate must not move it backward.
3523        assert_eq!(env.update_dtvlsn(5), 10);
3524        assert_eq!(env.get_dtvlsn(), 10);
3525        // Equal is a no-op.
3526        assert_eq!(env.update_dtvlsn(10), 10);
3527        // set_dtvlsn (replica path) is also advance-only.
3528        env.set_dtvlsn(7);
3529        assert_eq!(env.get_dtvlsn(), 10);
3530        env.set_dtvlsn(20);
3531        assert_eq!(env.get_dtvlsn(), 20);
3532    }
3533
3534    #[test]
3535    fn test_dtvlsn_majority_min_across_feeders() {
3536        use crate::node_type::NodeType;
3537        use crate::rep_node::RepNode;
3538        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3539        env.become_master(1).unwrap();
3540        // Three electable replicas → electable_count = 4 (incl. master) →
3541        // durable_ack_count = 2. With master self-ack, DTVLSN advances to the
3542        // min of the 2 highest qualifying feeders that exceed the current
3543        // DTVLSN.
3544        for (i, name) in ["r1", "r2", "r3"].iter().enumerate() {
3545            env.add_peer(RepNode::new(
3546                name.to_string(),
3547                NodeType::Electable,
3548                "127.0.0.1".to_string(),
3549                6100 + i as u16,
3550                (i + 2) as u32,
3551            ))
3552            .unwrap();
3553        }
3554        // Register feeders with differing acked VLSNs: r1=100, r2=80, r3=50.
3555        for (name, vlsn) in [("r1", 100u64), ("r2", 80), ("r3", 50)] {
3556            let f = crate::stream::feeder::Feeder::new(name.to_string());
3557            f.record_ack(vlsn);
3558            env.feeders.write().push(f);
3559        }
3560        env.update_dtvlsn_from_feeders();
3561        // First two qualifying feeders encountered are r1(100), r2(80);
3562        // min(100,80)=80 and that is a majority (2 of 4) → DTVLSN = 80.
3563        // (r3=50 < 80 is not required for durability.)
3564        assert!(
3565            env.get_dtvlsn() >= 80,
3566            "DTVLSN must reach the majority-min (>=80), got {}",
3567            env.get_dtvlsn()
3568        );
3569    }
3570
3571    #[test]
3572    fn test_close_sets_shutdown() {
3573        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3574        assert!(!env.is_shutdown());
3575
3576        env.close().unwrap();
3577        assert!(env.is_shutdown());
3578        // After close, state should be Shutdown
3579        assert_eq!(env.get_state(), NodeState::Shutdown);
3580    }
3581
3582    #[test]
3583    fn test_close_is_idempotent() {
3584        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3585        env.close().unwrap();
3586        env.close().unwrap(); // Should not error
3587        assert!(env.is_shutdown());
3588    }
3589
3590    #[test]
3591    fn test_cannot_become_master_when_shutdown() {
3592        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3593        env.close().unwrap();
3594
3595        let result = env.become_master(1);
3596        assert!(result.is_err());
3597    }
3598
3599    #[test]
3600    fn test_cannot_become_replica_when_shutdown() {
3601        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3602        env.close().unwrap();
3603
3604        let result = env.become_replica("master");
3605        assert!(result.is_err());
3606    }
3607
3608    #[test]
3609    fn test_cannot_apply_entry_when_shutdown() {
3610        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3611        env.close().unwrap();
3612
3613        let result = env.apply_entry(1, 0, vec![1, 2, 3]);
3614        assert!(result.is_err());
3615    }
3616
3617    #[test]
3618    fn test_cannot_transfer_master_when_not_master() {
3619        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3620        env.become_replica("other").unwrap();
3621
3622        let config = MasterTransferConfig::new(
3623            "target_node".to_string(),
3624            Duration::from_secs(30),
3625        );
3626        let result = env.transfer_master(config);
3627        assert!(result.is_err());
3628    }
3629
3630    #[test]
3631    fn test_transfer_master_requires_registered_target() {
3632        // F7: transfer_master is no longer a no-op; it sends an ADMIN
3633        // TRANSFER_MASTER signal to the target via TCP.  An unregistered
3634        // target is rejected at the address-resolution step.
3635        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3636        env.become_master(1).unwrap();
3637
3638        let config = MasterTransferConfig::new(
3639            "unknown_target".to_string(),
3640            Duration::from_secs(30),
3641        );
3642        let result = env.transfer_master(config);
3643        assert!(
3644            result.is_err(),
3645            "transfer_master to unregistered target must error"
3646        );
3647    }
3648
3649    #[test]
3650    fn test_apply_entry_registers_vlsn() {
3651        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3652        env.become_replica("master").unwrap();
3653
3654        env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
3655        env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
3656
3657        assert_eq!(env.get_current_vlsn(), 2);
3658    }
3659
3660    #[test]
3661    fn test_master_name_tracking() {
3662        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3663
3664        // Initially no master known
3665        assert!(env.get_master_name().is_none());
3666
3667        // After becoming master, this node is the master
3668        env.become_master(1).unwrap();
3669        assert_eq!(env.get_master_name(), Some("node1".to_string()));
3670    }
3671
3672    #[test]
3673    fn test_master_to_replica_transition() {
3674        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3675
3676        // Become master first
3677        env.become_master(1).unwrap();
3678        assert_eq!(env.get_master_name(), Some("node1".to_string()));
3679
3680        // Transition to replica (Master -> Replica is valid)
3681        env.become_replica("other_master").unwrap();
3682        assert_eq!(env.get_master_name(), Some("other_master".to_string()));
3683        assert!(env.is_replica());
3684    }
3685
3686    #[test]
3687    fn test_state_change_listener_notification() {
3688        struct TestListener {
3689            call_count: AtomicU32,
3690            last_new_state: noxu_sync::Mutex<Option<NodeState>>,
3691        }
3692
3693        impl StateChangeListener for TestListener {
3694            fn on_state_change(&self, event: StateChangeEvent) {
3695                self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
3696                *self.last_new_state.lock() = Some(event.new_state);
3697            }
3698        }
3699
3700        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3701        let listener = Arc::new(TestListener {
3702            call_count: AtomicU32::new(0),
3703            last_new_state: noxu_sync::Mutex::new(None),
3704        });
3705
3706        // Setting the listener should trigger an immediate notification
3707        env.set_state_change_listener(listener.clone());
3708        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
3709
3710        // State change should trigger another notification
3711        env.become_master(1).unwrap();
3712        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
3713        assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
3714    }
3715
3716    #[test]
3717    fn test_close_notifies_listeners() {
3718        struct ShutdownListener {
3719            shutdown_seen: AtomicBool,
3720        }
3721
3722        impl StateChangeListener for ShutdownListener {
3723            fn on_state_change(&self, event: StateChangeEvent) {
3724                if event.new_state == NodeState::Shutdown {
3725                    self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
3726                }
3727            }
3728        }
3729
3730        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3731        let listener = Arc::new(ShutdownListener {
3732            shutdown_seen: AtomicBool::new(false),
3733        });
3734
3735        // The initial notification is for the current (Detached) state
3736        env.set_state_change_listener(listener.clone());
3737
3738        // Become master first so the close transition is meaningful
3739        env.become_master(1).unwrap();
3740        assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3741
3742        env.close().unwrap();
3743        assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3744    }
3745
3746    #[test]
3747    fn test_shutdown_group_requires_master() {
3748        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3749        env.become_replica("other").unwrap();
3750
3751        let result = env.shutdown_group(5000);
3752        assert!(result.is_err());
3753    }
3754
3755    #[test]
3756    fn test_shutdown_group_as_master() {
3757        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3758        env.become_master(1).unwrap();
3759
3760        let result = env.shutdown_group(5000);
3761        assert!(result.is_ok());
3762        assert!(env.is_shutdown());
3763    }
3764
3765    #[test]
3766    fn test_get_config() {
3767        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3768        assert_eq!(env.get_config().node_name, "node1");
3769        assert_eq!(env.get_config().group_name, "test_group");
3770    }
3771
3772    #[test]
3773    fn test_get_stats() {
3774        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3775        let _stats = env.get_stats();
3776        // Just verify we can access stats without panicking
3777    }
3778
3779    // -----------------------------------------------------------------------
3780    // TCP dispatcher tests (H-5 / H-7)
3781    // -----------------------------------------------------------------------
3782
3783    #[test]
3784    fn test_tcp_dispatcher_starts_on_new() {
3785        // Use port 0 so the OS assigns an ephemeral port.
3786        let env =
3787            ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
3788        // The dispatcher must have started and bound a real port.
3789        let addr = env.bound_addr();
3790        assert!(addr.is_some(), "expected a bound address");
3791        let addr = addr.unwrap();
3792        assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
3793    }
3794
3795    #[test]
3796    fn test_tcp_dispatcher_stops_on_close() {
3797        let env =
3798            ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
3799        // Dispatcher is running.
3800        assert!(
3801            env.tcp_dispatcher
3802                .as_ref()
3803                .map(|d| d.is_running())
3804                .unwrap_or(false)
3805        );
3806
3807        env.close().unwrap();
3808
3809        // After close, dispatcher must be stopped.
3810        assert!(
3811            !env.tcp_dispatcher
3812                .as_ref()
3813                .map(|d| d.is_running())
3814                .unwrap_or(false),
3815            "dispatcher should be stopped after close"
3816        );
3817    }
3818
3819    #[test]
3820    fn test_tcp_dispatcher_accepts_connection() {
3821        use crate::net::Channel;
3822        use crate::net::ServiceHandler;
3823        use crate::net::service_dispatcher::connect_to_service;
3824        use std::sync::atomic::{AtomicU32, Ordering as AO};
3825        use std::time::Duration;
3826
3827        struct PingHandler {
3828            count: AtomicU32,
3829        }
3830        impl ServiceHandler for PingHandler {
3831            fn service_name(&self) -> &str {
3832                "ping"
3833            }
3834            fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
3835                self.count.fetch_add(1, AO::SeqCst);
3836                // Echo the first message back.
3837                if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
3838                    let _ = ch.send(&msg);
3839                }
3840                Ok(())
3841            }
3842        }
3843
3844        let env =
3845            ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
3846        let addr = env.bound_addr().expect("dispatcher must be bound");
3847
3848        // Register a ping handler on the running dispatcher.
3849        if let Some(ref disp) = env.tcp_dispatcher {
3850            let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
3851            disp.register("ping", handler.clone());
3852
3853            // Give the accept thread a moment.
3854            std::thread::sleep(Duration::from_millis(20));
3855
3856            let client = connect_to_service(addr, "ping").unwrap();
3857            client.send(b"hello").unwrap();
3858            let reply = client.receive(Duration::from_secs(2)).unwrap();
3859            assert_eq!(reply, Some(b"hello".to_vec()));
3860
3861            assert_eq!(handler.count.load(AO::SeqCst), 1);
3862        }
3863
3864        env.close().unwrap();
3865    }
3866
3867    #[test]
3868    fn test_become_master_auto_transitions_from_detached() {
3869        // The state machine requires Detached -> Unknown -> Master.
3870        // become_master() should handle this automatically.
3871        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3872        assert_eq!(env.get_state(), NodeState::Detached);
3873        env.become_master(1).unwrap();
3874        assert_eq!(env.get_state(), NodeState::Master);
3875    }
3876
3877    #[test]
3878    fn test_become_replica_auto_transitions_from_detached() {
3879        // The state machine requires Detached -> Unknown -> Replica.
3880        // become_replica() should handle this automatically.
3881        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3882        assert_eq!(env.get_state(), NodeState::Detached);
3883        env.become_replica("master_node").unwrap();
3884        assert_eq!(env.get_state(), NodeState::Replica);
3885    }
3886
3887    #[test]
3888    fn test_cannot_transfer_master_when_shutdown() {
3889        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3890        env.become_master(1).unwrap();
3891        env.close().unwrap();
3892
3893        let config = MasterTransferConfig::new(
3894            "target".to_string(),
3895            Duration::from_secs(30),
3896        );
3897        let result = env.transfer_master(config);
3898        assert!(result.is_err());
3899    }
3900
3901    #[test]
3902    fn test_full_lifecycle() {
3903        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3904
3905        // Start as detached
3906        assert_eq!(env.get_state(), NodeState::Detached);
3907
3908        // Become master
3909        env.become_master(1).unwrap();
3910        assert!(env.is_master());
3911
3912        // Register some VLSNs
3913        env.register_vlsn(1, 0, 100);
3914        env.register_vlsn(2, 0, 200);
3915
3916        // Record ack from replica
3917        env.record_ack(1, "replica1");
3918        env.record_ack(2, "replica1");
3919
3920        // Transition to replica (simulating failover)
3921        env.become_replica("node2").unwrap();
3922        assert!(env.is_replica());
3923
3924        // Apply entries from new master
3925        env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
3926
3927        // Close
3928        env.close().unwrap();
3929        assert!(env.is_shutdown());
3930    }
3931
3932    /// Verify that `with_environment` lazily registers the RESTORE service on
3933    /// the TCP dispatcher when `config.env_home` was not set at construction.
3934    ///
3935    /// This mirrors`RepNode.envSetup()` which registers the restore handler
3936    /// when the environment is wired into the replicated node.
3937    #[test]
3938    fn test_restore_registered_lazily_via_with_environment() {
3939        use noxu_dbi::EnvironmentImpl;
3940        use tempfile::TempDir;
3941
3942        let dir = TempDir::new().expect("temp dir");
3943
3944        // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
3945        let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
3946            .node_port(0)
3947            .build();
3948
3949        let rep_env = ReplicatedEnvironment::new(config).unwrap();
3950
3951        // Not yet registered.
3952        assert!(
3953            !rep_env
3954                .restore_registered
3955                .load(std::sync::atomic::Ordering::SeqCst)
3956        );
3957
3958        // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
3959        let env_impl = Arc::new(
3960            EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
3961        );
3962        rep_env.with_environment(env_impl);
3963
3964        // Now the RESTORE service must be registered.
3965        assert!(
3966            rep_env
3967                .restore_registered
3968                .load(std::sync::atomic::Ordering::SeqCst)
3969        );
3970    }
3971
3972    /// Verify that when `config.env_home` IS set at construction, the RESTORE
3973    /// service is registered immediately (not deferred).
3974    #[test]
3975    fn test_restore_registered_eagerly_when_env_home_in_config() {
3976        use tempfile::TempDir;
3977
3978        let dir = TempDir::new().expect("temp dir");
3979
3980        let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
3981            .node_port(0)
3982            .env_home(dir.path())
3983            .build();
3984
3985        let rep_env = ReplicatedEnvironment::new(config).unwrap();
3986
3987        // Should be registered immediately (env_home was in config).
3988        assert!(
3989            rep_env
3990                .restore_registered
3991                .load(std::sync::atomic::Ordering::SeqCst)
3992        );
3993    }
3994}