Skip to main content

noxu_rep/
replicated_environment.rs

1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29    AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30    ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43    ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::{
50    AnyServiceDispatcher, TcpServiceDispatcher,
51};
52use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
53use crate::network_restore_server::{
54    NetworkRestoreServer, RESTORE_SERVICE_NAME,
55};
56use crate::node_state::{NodeState, NodeStateMachine};
57use crate::rep_config::RepConfig;
58use crate::rep_stats::RepStats;
59use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
60use crate::stream::feeder::EnvironmentLogScanner;
61use crate::stream::feeder::Feeder;
62use crate::stream::feeder::FeederRunner;
63use crate::stream::peer_feeder::PeerScannerAdapter;
64use crate::stream::peer_feeder::{
65    PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
66};
67use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
68use crate::vlsn::vlsn_index::VlsnIndex;
69use crate::vlsn::vlsn_range::VlsnRange;
70use std::collections::HashMap;
71
72/// Default heartbeat timeout for master liveness detection.
73const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
74
75/// A replicated database environment.
76///
77///
78///
79/// This is the entry point for replication. It wraps a standard Environment
80/// and adds replication capabilities including master election, replica
81/// streaming, and commit acknowledgments.
82///
83/// High Availability (HA) provides a replicated, embedded database
84/// management system which provides fast, reliable, and scalable data
85/// management. HA enables replication of an environment across a Replication
86/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
87///
88/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
89/// operations are executed in the same fashion in both replicated and
90/// non-replicated applications. A `ReplicatedEnvironment` must be
91/// transactional. All replicated databases created in the replicated
92/// environment must be transactional as well.
93///
94/// A `ReplicatedEnvironment` joins its replication group when it is created.
95/// When `new()` returns, the node will have established contact with the other
96/// members of the group and will be ready to service operations.
97///
98/// Replicated environments can be created with node type Electable or
99/// Secondary. Electable nodes can be masters or replicas, and participate in
100/// both master elections and commit durability decisions. Secondary nodes can
101/// only be replicas, not masters, and do not participate in either elections or
102/// durability decisions.
103///
104/// # Example
105///
106/// ```ignore
107/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
108///
109/// let config = RepConfig::builder("my_group", "node1", "localhost")
110///     .node_port(5001)
111///     .build();
112/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
113/// ```
114pub struct ReplicatedEnvironment {
115    /// The replication configuration for this node.
116    config: RepConfig,
117    /// Tracks the current node state (Detached, Unknown, Master, Replica).
118    node_state: NodeStateMachine,
119    /// Service for managing the replication group membership.
120    group_service: GroupService,
121    /// Maps VLSNs to log file positions.
122    ///
123    /// Wrapped in `Arc` so that background daemons (election driver,
124    /// VLSN-index persistence flusher) can share access without
125    /// borrowing the env.  Closes finding F11 (
126    /// the 2026 review).
127    vlsn_index: Arc<VlsnIndex>,
128    /// Tracks acknowledgments from replicas (used by master).
129    ack_tracker: AckTracker,
130    /// Replication statistics.
131    stats: RepStats,
132    /// Active feeder threads (master -> replica streams).
133    feeders: RwLock<Vec<Feeder>>,
134    /// Replica stream for receiving updates from the master.
135    replica_stream: ReplicaStream,
136    /// Tracks the current master node.
137    master_tracker: MasterTracker,
138    /// State change listeners.
139    listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
140    /// Shutdown flag.
141    shutdown: AtomicBool,
142    /// Service dispatcher — listens on the replication port and routes
143    /// incoming connections to the appropriate service handler (feeder, etc.).
144    ///
145    /// `Plain`: plain TCP (default / Phase-2 behaviour).
146    /// `Tls`: TLS + mTLS enforcement (Phase 3, when `RepConfig::tls_config` is set
147    /// and `transport_kind` is `Tls`).
148    ///
149    /// `None` only when the bind address cannot be resolved.
150    tcp_dispatcher: Option<AnyServiceDispatcher>,
151    /// The address the `tcp_dispatcher` is actually bound to (may differ from
152    /// the configured port when port 0 is used in tests).
153    bound_addr: Option<SocketAddr>,
154
155    /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
156    ///
157    /// When set, `become_master` spawns a `FeederRunner` per replica using
158    /// `EnvironmentLogScanner`, and `become_replica` spawns a
159    /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
160    ///
161    /// In HA.
162    env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
163
164    /// Background I/O thread handles spawned during state transitions.
165    ///
166    /// Stored so that `close()` can join them cleanly.  Each handle is
167    /// `Option` so we can `take()` it in `close()`.
168    io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
169
170    /// Shutdown flag shared with I/O threads so they terminate when the
171    /// environment is closed.
172    io_shutdown: AtomicBool,
173
174    /// Whether the RESTORE service has been registered on the TCP dispatcher.
175    ///
176    /// When `config.env_home` is `None` at construction time, registration is
177    /// deferred until `with_environment()` provides the env home path.
178    restore_registered: AtomicBool,
179
180    /// In-memory log queue used by the peer feeder service.
181    ///
182    /// When this node is a replica, `apply_entry()` pushes each received log
183    /// entry here.  The `PeerFeederService` registered on the TCP dispatcher
184    /// reads from this queue to stream entries to downstream replicas that
185    /// are behind this node (peer-to-peer log distribution, HA style).
186    peer_scanner: Arc<PeerLogScanner>,
187
188    /// Durable Transaction VLSN (D7, JE RepNode.dtvlsn): the highest VLSN
189    /// known to have been replicated to a *majority* of the electable
190    /// replicas. On a master it is computed from feeder ack/heartbeat progress
191    /// (`update_dtvlsn_from_feeders`); on a replica it is set from commit/abort
192    /// records in the stream (`set_dtvlsn`). It advances monotonically (an
193    /// `update_max`). 0 = NULL_VLSN. Used by the election ranking (D2) so the
194    /// most-durable node, not merely the highest-raw-VLSN node, wins.
195    dtvlsn: std::sync::atomic::AtomicU64,
196
197    /// Shared acceptor state used by the ELECTION service handler.
198    /// The election driver updates `own_vlsn` / `own_term` here as the
199    /// node progresses; incoming acceptor sessions read it on every
200    /// connection so their replies always reflect the local node's
201    /// most recent state.  Closes finding F6.
202    election_state: Arc<ElectionAcceptorState>,
203
204    /// Self-referential `Weak` populated once the env has been wrapped
205    /// in an `Arc`.  Used by the replica I/O thread spawned in
206    /// `become_replica` so it can call `bootstrap_via_dispatcher` when
207    /// the master signals `NeedsRestore`.
208    ///
209    /// Populated lazily via [`Self::init_self_weak`] from `open()` and
210    /// the test harness.  When unset (callers that build the env via
211    /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
212    /// the I/O thread falls back to operator-driven bootstrap.
213    self_weak: OnceLock<Weak<Self>>,
214
215    // -----------------------------------------------------------------------
216    // C-C2: active push-feeder infrastructure
217    // -----------------------------------------------------------------------
218    /// Per-replica channels injected via [`Self::register_feeder_channel`].
219    ///
220    /// When [`Self::become_master`] is called (or when the node is already
221    /// master), a [`FeederRunner`] thread is spawned for each registered
222    /// channel, actively streaming entries to that replica over the channel.
223    ///
224    /// Using `register_feeder_channel` is the primary integration point for
225    /// the push-based feeder path.  Production deployments wire in a
226    /// `TcpChannel`; test code uses `LocalChannelPair`.
227    feeder_channels: StdMutex<HashMap<String, Arc<dyn crate::net::Channel>>>,
228
229    /// Per-replica dedicated entry queues backing the push-feeder path.
230    ///
231    /// Each `FeederRunner` thread reads exclusively from its replica's queue.
232    /// [`Self::replicate_entry`] and [`Self::apply_entry`] fan out into all
233    /// registered queues so the push runners receive entries without competing
234    /// with [`PeerFeederService`] for ownership of `peer_scanner`.
235    feeder_queues: std::sync::RwLock<HashMap<String, Arc<PeerLogScanner>>>,
236
237    /// Active `FeederRunner` references for acked-VLSN queries and
238    /// clean shutdown (M-4: wait for replicas to catch up).
239    active_feeder_runners: StdMutex<HashMap<String, Arc<FeederRunner>>>,
240
241    /// Monotone VLSN counter shared with the wired `EnvironmentImpl`.
242    ///
243    /// Installed into the environment via
244    /// `EnvironmentImpl::set_replication_vlsn_counter()` when
245    /// `with_environment` is called.  Each `log_txn_commit` on the master
246    /// atomically increments this counter and writes a VLSN-tagged WAL entry,
247    /// which `EnvironmentLogScanner` then picks up without any
248    /// `replicate_entry` call from the application.
249    wal_vlsn_counter: Arc<std::sync::atomic::AtomicU64>,
250}
251
252impl ReplicatedEnvironment {
253    /// Create a new replicated environment.
254    ///
255    ///
256    ///
257    /// Creates a replicated environment handle and starts participating in the
258    /// replication group. The node's state is determined when it joins the
259    /// group, and mastership is not preconfigured. If the group has no current
260    /// master, creation will trigger an election to determine whether this node
261    /// will participate as a Master or a Replica.
262    ///
263    /// A brand new node will always join an existing group as a Replica, unless
264    /// it is the very first electable node that is creating the group. In that
265    /// case it joins as the Master of the newly formed singleton group.
266    pub fn new(config: RepConfig) -> Result<Self> {
267        // mTLS Phase 2 (v3.1.0): peer_allowlist enforcement is real at the
268        // TLS channel layer (TlsTcpChannelListener::bind_with_tls_and_allowlist).
269        // Phase 3 (this release): when RepConfig::tls_config is set AND
270        // transport_kind is Tls, the service dispatcher itself enforces mTLS
271        // via TlsTcpServiceDispatcher.  For the remaining cases (no TlsConfig
272        // or non-TLS transport) keep the Phase-2 accurate warn.
273        if !config.peer_allowlist.is_empty() {
274            match config.transport_kind {
275                crate::rep_config::RepTransportKind::Tls => {
276                    if config.tls_config.is_some() {
277                        log::info!(
278                            "[{}] peer_allowlist ({} entries) + tls_config set; \
279                             mTLS will be enforced on the service dispatcher.",
280                            config.node_name,
281                            config.peer_allowlist.len(),
282                        );
283                    } else {
284                        log::info!(
285                            "[{}] peer_allowlist configured ({} entries) but \
286                             tls_config is None — the service dispatcher will \
287                             use plain TCP. Set RepConfig::tls_config to \
288                             activate end-to-end mTLS on this path.",
289                            config.node_name,
290                            config.peer_allowlist.len(),
291                        );
292                    }
293                }
294                _ => {
295                    log::warn!(
296                        "[{}] peer_allowlist is configured ({} entries) but \
297                         transport_kind is not Tls — the allowlist has no \
298                         effect without TLS transport. Set \
299                         RepTransportKind::Tls to activate mTLS enforcement.",
300                        config.node_name,
301                        config.peer_allowlist.len(),
302                    );
303                }
304            }
305        }
306        let node_state = NodeStateMachine::new();
307        let group_service = GroupService::new(config.group_name.clone());
308        let vlsn_index = {
309            // F11: try to load a previously persisted vlsn.idx from
310            // env_home if one exists.  A successfully loaded index lets a
311            // restarted replica resume from where it left off without a
312            // full network restore; a missing or corrupt file falls back
313            // to a fresh in-memory index (caller will need to bootstrap).
314            if let Some(ref home) = config.env_home {
315                match crate::vlsn::persist::load_from_disk(home) {
316                    Ok(Some(idx)) => {
317                        log::info!(
318                            "Node '{}' loaded persisted VLSN index from {} \
319                             ({} entries, latest vlsn={})",
320                            config.node_name,
321                            home.display(),
322                            idx.snapshot_entries().len(),
323                            idx.get_latest_vlsn(),
324                        );
325                        Arc::new(idx)
326                    }
327                    Ok(None) => Arc::new(VlsnIndex::new(10)),
328                    Err(e) => {
329                        log::warn!(
330                            "Node '{}' failed to load persisted VLSN index \
331                             from {}: {} (treating as fresh node — network \
332                             restore required)",
333                            config.node_name,
334                            home.display(),
335                            e
336                        );
337                        // Best-effort: remove the corrupt file so the
338                        // next persist cycle writes a clean one.  A
339                        // missing file is the "fresh node" baseline.
340                        let _ = std::fs::remove_file(
341                            crate::vlsn::persist::index_path(home),
342                        );
343                        Arc::new(VlsnIndex::new(10))
344                    }
345                }
346            } else {
347                Arc::new(VlsnIndex::new(10))
348            }
349        };
350        let ack_tracker = AckTracker::new();
351        let stats = RepStats::new();
352        let feeders = RwLock::new(Vec::new());
353        let replica_stream = ReplicaStream::new();
354        let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
355
356        // Start the service dispatcher.
357        //
358        // Phase 3: when RepConfig::tls_config is set AND transport_kind is Tls,
359        // start a TlsTcpServiceDispatcher (mTLS enforced).  Otherwise fall back
360        // to the plain-TCP TcpServiceDispatcher.
361        let listen_addr_str =
362            format!("{}:{}", config.node_host, config.node_port);
363        let mut restore_registered_init = false;
364
365        // Returns (AnyServiceDispatcher, bound_addr) or (None, None) on error.
366        let (tcp_dispatcher, bound_addr) = match listen_addr_str
367            .parse::<SocketAddr>()
368        {
369            Ok(addr) => {
370                let build_result: Result<(AnyServiceDispatcher, SocketAddr)> =
371                    Self::build_dispatcher(&config, addr);
372                match build_result {
373                    Ok((dispatcher, bound)) => {
374                        // Register the network restore handler.
375                        if let Some(ref home) = config.env_home {
376                            let restore_server =
377                                NetworkRestoreServer::new(home.clone());
378                            dispatcher.register(
379                                RESTORE_SERVICE_NAME,
380                                Arc::new(restore_server),
381                            );
382                            log::debug!(
383                                "Node '{}' RESTORE service registered \
384                                     (env_home={})",
385                                config.node_name,
386                                home.display(),
387                            );
388                            restore_registered_init = true;
389                        }
390                        let kind =
391                            if dispatcher.is_tls() { "TLS" } else { "TCP" };
392                        log::info!(
393                            "Node '{}' {} service dispatcher started on {}",
394                            config.node_name,
395                            kind,
396                            bound
397                        );
398                        (Some(dispatcher), Some(bound))
399                    }
400                    Err(e) => {
401                        log::warn!(
402                            "Node '{}' failed to start service dispatcher \
403                             on {}: {}",
404                            config.node_name,
405                            listen_addr_str,
406                            e
407                        );
408                        (None, None)
409                    }
410                }
411            }
412            Err(e) => {
413                log::warn!(
414                    "Node '{}' cannot parse listen address '{}': {}",
415                    config.node_name,
416                    listen_addr_str,
417                    e
418                );
419                (None, None)
420            }
421        };
422
423        // Build the in-memory peer log scanner; register the peer feeder
424        // service on the dispatcher so downstream replicas can connect.
425        let peer_scanner = Arc::new(PeerLogScanner::new());
426        // F5/F31: build the acceptor state with persistence enabled when
427        // env_home is configured.  Crash-durable promises are required
428        // for the Paxos safety invariant after a process restart.
429        let election_state =
430            Arc::new(if let Some(ref home) = config.env_home {
431                ElectionAcceptorState::with_env_home(
432                    config.node_name.clone(),
433                    1,
434                    home,
435                )
436            } else {
437                ElectionAcceptorState::new(config.node_name.clone(), 1)
438            });
439        if let Some(ref dispatcher) = tcp_dispatcher {
440            let service = PeerFeederService::new(Arc::clone(&peer_scanner));
441            dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
442            log::debug!(
443                "Node '{}' PEER_FEEDER service registered",
444                config.node_name,
445            );
446            // F6: register the ELECTION service so peers can run
447            // run_acceptor against this node when proposing.
448            let election_svc =
449                Arc::new(ElectionService::new(Arc::clone(&election_state)));
450            dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
451            log::debug!(
452                "Node '{}' ELECTION service registered",
453                config.node_name,
454            );
455        }
456
457        let env = Self {
458            config,
459            node_state,
460            group_service,
461            vlsn_index,
462            ack_tracker,
463            stats,
464            feeders,
465            replica_stream,
466            master_tracker,
467            listeners: RwLock::new(Vec::new()),
468            shutdown: AtomicBool::new(false),
469            tcp_dispatcher,
470            bound_addr,
471            env_impl: StdMutex::new(None),
472            io_threads: StdMutex::new(Vec::new()),
473            io_shutdown: AtomicBool::new(false),
474            restore_registered: AtomicBool::new(restore_registered_init),
475            peer_scanner,
476            dtvlsn: std::sync::atomic::AtomicU64::new(0),
477            election_state,
478            self_weak: OnceLock::new(),
479            feeder_channels: StdMutex::new(HashMap::new()),
480            feeder_queues: std::sync::RwLock::new(HashMap::new()),
481            active_feeder_runners: StdMutex::new(HashMap::new()),
482            wal_vlsn_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
483        };
484
485        Ok(env)
486    }
487
488    /// Open a replicated environment with the standard production
489    /// lifecycle.
490    ///
491    /// This is the entry point recommended by the mdBook chapters:
492    /// it allocates the `ReplicatedEnvironment`, registers all
493    /// services on the TCP dispatcher, and spawns the **election
494    /// driver** background thread that runs Paxos rounds against
495    /// known peers until the node has resolved into either Master or
496    /// Replica state.
497    ///
498    /// Closes finding F6 of the 2026 review.
499    ///
500    /// Use [`ReplicatedEnvironment::new`] directly only when the
501    /// caller plans to drive state transitions explicitly (test
502    /// harnesses, scripted bootstrap, recovery tooling).
503    pub fn open(config: RepConfig) -> Result<Arc<Self>> {
504        let env = Arc::new(Self::new(config)?);
505        env.init_self_weak();
506        env.start_election_driver();
507        env.start_vlsn_persistence_daemon();
508        env.register_admin_service();
509        Ok(env)
510    }
511
512    /// Build the service dispatcher for this node.
513    ///
514    /// Phase 3 logic: when `config.transport_kind == Tls` AND
515    /// `config.tls_config` is `Some`, start a
516    /// [`crate::net::service_dispatcher::TlsTcpServiceDispatcher`] that
517    /// enforces mTLS with the configured `peer_allowlist`.  Otherwise
518    /// start the plain-TCP [`TcpServiceDispatcher`].
519    ///
520    /// Returns `(dispatcher, bound_addr)` or a `RepError` on bind / TLS
521    /// config failure.
522    fn build_dispatcher(
523        #[cfg_attr(not(feature = "tls-rustls"), allow(unused_variables))]
524        config: &RepConfig,
525        addr: SocketAddr,
526    ) -> Result<(AnyServiceDispatcher, SocketAddr)> {
527        #[cfg(feature = "tls-rustls")]
528        if config.transport_kind == crate::rep_config::RepTransportKind::Tls {
529            use crate::auth::PeerAllowlist;
530            use crate::net::service_dispatcher::TlsTcpServiceDispatcher;
531            let tls = config.tls_config.as_ref().ok_or_else(|| {
532                RepError::ConfigError(format!(
533                    "node '{}': transport_kind=Tls requires a tls_config",
534                    config.node_name,
535                ))
536            })?;
537            let allowlist =
538                PeerAllowlist::new(config.peer_allowlist.iter().cloned());
539            // Fail-closed: an empty allowlist with TLS transport is a
540            // misconfiguration. The same policy is enforced at the TLS
541            // listener and QUIC constructors; downgrading to plain TCP here
542            // would be a silent security regression for a node that asked
543            // for TLS.
544            if allowlist.is_empty() {
545                return Err(RepError::ConfigError(format!(
546                    "node '{}': transport_kind=Tls requires a non-empty \
547                     peer_allowlist (mTLS enforcement is fail-closed)",
548                    config.node_name,
549                )));
550            }
551            let disp = TlsTcpServiceDispatcher::new(addr, tls, allowlist)?;
552            let bound = disp.start()?;
553            return Ok((AnyServiceDispatcher::Tls(disp), bound));
554        }
555        // Plain-TCP dispatcher (default or when TLS config is missing).
556        let disp = TcpServiceDispatcher::new(addr).map_err(|e| {
557            RepError::NetworkError(format!("TCP dispatcher init: {e}"))
558        })?;
559        let bound = disp.start()?;
560        Ok((AnyServiceDispatcher::Plain(disp), bound))
561    }
562
563    /// Populate the env's self-referential `Weak` so background
564    /// threads can obtain a back-reference for auto-orchestrated
565    /// follow-up actions (e.g. replica auto-bootstrap on
566    /// `NeedsRestore`).  Idempotent: subsequent calls are silent
567    /// no-ops because the inner [`OnceLock`] only accepts one set.
568    ///
569    /// Callers that wrap the env in `Arc` and want auto-bootstrap
570    /// behaviour should call this immediately after construction.
571    /// `Self::open` already does so.  Test harnesses that drive
572    /// transitions manually (`RepTestBase`) also call this so the
573    /// auto-bootstrap path is exercised in tests.
574    pub fn init_self_weak(self: &Arc<Self>) {
575        let _ = self.self_weak.set(Arc::downgrade(self));
576    }
577
578    /// Register the `ADMIN` service handler on the TCP dispatcher.
579    ///
580    /// Closes findings F7 / F8.  Holds a `Weak<Self>` so the handler
581    /// does not extend the env's lifetime.  Idempotent: re-registering
582    /// is harmless because `TcpServiceDispatcher::register` overwrites
583    /// the existing handler.
584    pub fn register_admin_service(self: &Arc<Self>) {
585        if let Some(ref dispatcher) = self.tcp_dispatcher {
586            crate::group_admin::register_admin_service(
587                dispatcher,
588                Arc::downgrade(self),
589            );
590            log::debug!(
591                "Node '{}' ADMIN service registered",
592                self.config.node_name,
593            );
594        }
595    }
596
597    /// Spawn the VLSN-index persistence daemon (F11).
598    ///
599    /// Periodically (every 2 seconds) snapshots the in-memory
600    /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
601    /// resume from where the replica left off without a full network
602    /// restore.  No-op when `config.env_home` is `None`.
603    ///
604    /// Idempotent: only one daemon is ever spawned per env.
605    pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
606        let Some(home) = self.config.env_home.clone() else {
607            return;
608        };
609        {
610            let threads = self.io_threads.lock().unwrap();
611            if threads.iter().any(|h| {
612                h.thread()
613                    .name()
614                    .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
615            }) {
616                return;
617            }
618        }
619
620        let vlsn_index = Arc::clone(&self.vlsn_index);
621        let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
622        let me = Arc::clone(self);
623        let interval = Duration::from_secs(2);
624
625        let handle = std::thread::Builder::new()
626            .name(name)
627            .spawn(move || {
628                use std::sync::atomic::Ordering;
629                let mut last_persisted_vlsn: u64 = 0;
630                while !me.io_shutdown.load(Ordering::SeqCst)
631                    && !me.is_shutdown()
632                {
633                    std::thread::sleep(interval);
634                    if me.io_shutdown.load(Ordering::SeqCst) {
635                        break;
636                    }
637                    let latest = vlsn_index.get_latest_vlsn();
638                    if latest == last_persisted_vlsn {
639                        // Nothing new to flush.
640                        continue;
641                    }
642                    // X-2: cap the flush at the last durable checkpoint's
643                    // end LSN so the persisted VLSN index never claims
644                    // VLSNs beyond the durable B-tree state.  After a crash
645                    // the recovered tree and the index will be coherent.
646                    let cap_lsn = me
647                        .env_impl
648                        .lock()
649                        .unwrap()
650                        .as_ref()
651                        .and_then(|e| e.get_checkpointer())
652                        .map(|c| c.get_last_checkpoint_end())
653                        .unwrap_or(noxu_util::NULL_LSN);
654                    match crate::vlsn::persist::flush_to_disk_capped(
655                        &vlsn_index,
656                        &home,
657                        cap_lsn,
658                    ) {
659                        Ok(n) => {
660                            log::trace!(
661                                "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
662                                n,
663                                latest,
664                                cap_lsn,
665                            );
666                            last_persisted_vlsn = latest;
667                        }
668                        Err(e) => {
669                            log::warn!(
670                                "vlsn-flush: failed to persist VLSN index to {}: {}",
671                                home.display(),
672                                e
673                            );
674                        }
675                    }
676                }
677                // Final flush on shutdown so a clean close is recoverable.
678                // Cap at the last checkpoint even for the shutdown flush.
679                let cap_lsn = me
680                    .env_impl
681                    .lock()
682                    .unwrap()
683                    .as_ref()
684                    .and_then(|e| e.get_checkpointer())
685                    .map(|c| c.get_last_checkpoint_end())
686                    .unwrap_or(noxu_util::NULL_LSN);
687                if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
688                    &vlsn_index,
689                    &home,
690                    cap_lsn,
691                ) {
692                    log::warn!(
693                        "vlsn-flush (final): failed to persist VLSN index: {}",
694                        e
695                    );
696                }
697            })
698            .expect("failed to spawn noxu-vlsn-flush thread");
699
700        self.io_threads.lock().unwrap().push(handle);
701        log::debug!(
702            "Node '{}' VLSN persistence daemon started",
703            self.config.node_name,
704        );
705    }
706
707    /// Spawn the election driver background thread.
708    ///
709    /// While the env is in `Detached` or `Unknown` state and no master
710    /// is known, the driver periodically attempts a Paxos election
711    /// against peers in `GroupService` (whose ELECTION services were
712    /// registered at `open()` time).  On success the driver calls
713    /// `become_master` (if this node is the winner) or `become_replica`
714    /// (otherwise).  On failure (no quorum), the driver waits
715    /// `config.election_timeout` and tries again.
716    ///
717    /// The driver respects `io_shutdown`; on env close the loop exits
718    /// promptly.
719    ///
720    /// Idempotent: a second call is a no-op (only one driver thread is
721    /// ever spawned per env).
722    pub fn start_election_driver(self: &Arc<Self>) {
723        use std::sync::atomic::Ordering;
724        // Reuse io_shutdown for cancellation; a successful spawn is
725        // recorded by appending to io_threads, so a duplicate call
726        // would re-add a thread — we use a one-shot `AtomicBool`
727        // sentinel placed in the io_shutdown's slot via a new field.
728        // Cheaper: a static name check on io_threads is impossible;
729        // instead, gate spawning on whether any io_thread already
730        // carries the driver name.
731        {
732            let threads = self.io_threads.lock().unwrap();
733            if threads.iter().any(|h| {
734                h.thread()
735                    .name()
736                    .is_some_and(|n| n.starts_with("noxu-election-"))
737            }) {
738                return;
739            }
740        }
741
742        let me = Arc::clone(self);
743        let name = format!("noxu-election-{}", self.config.node_name);
744        let handle = std::thread::Builder::new()
745            .name(name)
746            .spawn(move || {
747                me.run_election_loop();
748            })
749            .expect("failed to spawn election driver thread");
750        self.io_threads.lock().unwrap().push(handle);
751        log::debug!("Node '{}' election driver started", self.config.node_name,);
752        // Keep ordering sane on the io_shutdown flag.
753        let _ = self.io_shutdown.load(Ordering::SeqCst);
754    }
755
756    /// Body of the election driver loop.  Public only for tests; called
757    /// by [`Self::start_election_driver`].
758    fn run_election_loop(self: Arc<Self>) {
759        use std::sync::atomic::Ordering;
760        // Maintain an internal monotonically increasing election term.
761        // Each successful or failed round bumps the term so retries do
762        // not collide with stale acceptor promises.
763        let mut term: u64 = 1;
764
765        loop {
766            if self.io_shutdown.load(Ordering::SeqCst) {
767                return;
768            }
769            if self.is_shutdown() {
770                return;
771            }
772
773            let state = self.node_state.get_state();
774            // Stop driving once we've resolved into Master/Replica;
775            // re-arm only if the node returns to Unknown.
776            if matches!(state, NodeState::Master | NodeState::Replica) {
777                std::thread::sleep(Duration::from_millis(200));
778                continue;
779            }
780            if matches!(state, NodeState::Shutdown) {
781                return;
782            }
783
784            // Probe peers for an active master via the existing
785            // GroupService cache.  In the absence of a heartbeat path
786            // we rely on master_tracker (set by become_replica from
787            // the receive loop).
788            if let Some(master_name) = self.master_tracker.get_master()
789                && master_name != self.config.node_name
790            {
791                let _ = self.become_replica(&master_name);
792                continue;
793            }
794
795            // Snapshot peers to dial for ELECTION.
796            let peers: Vec<(String, SocketAddr)> = self
797                .group_service
798                .get_all_nodes()
799                .into_iter()
800                .filter(|n| n.name != self.config.node_name)
801                .filter_map(|n| {
802                    format!("{}:{}", n.host, n.port)
803                        .parse::<SocketAddr>()
804                        .ok()
805                        .map(|a| (n.name, a))
806                })
807                .collect();
808
809            // Build the local rep group view used by run_election to
810            // compute quorum and resolve the winner name.  Include
811            // self.
812            let group = self.local_rep_group_with_self();
813
814            // Update election state for any concurrent acceptor calls.
815            let our_vlsn = self.vlsn_index.get_latest_vlsn();
816            self.election_state.set_vlsn(our_vlsn);
817            self.election_state.set_term(term);
818            // D2: advertise our DTVLSN as the major election-ranking key.
819            self.election_state.set_dtvlsn(self.get_dtvlsn());
820
821            // Connect to each peer's ELECTION service.  Failures are
822            // tolerated: a peer that doesn't answer simply contributes
823            // no vote.  The election may still reach quorum in the
824            // remaining peers.
825            let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
826                Vec::new();
827            for (peer_name, addr) in &peers {
828                match crate::net::service_dispatcher::connect_to_service(
829                    *addr,
830                    ELECTION_SERVICE_NAME,
831                ) {
832                    Ok(ch) => {
833                        let arc: Arc<dyn crate::net::channel::Channel> =
834                            Arc::new(ch);
835                        channels.push(arc);
836                    }
837                    Err(e) => {
838                        log::trace!(
839                            "election driver: peer {} ({}) unreachable: {}",
840                            peer_name,
841                            addr,
842                            e
843                        );
844                    }
845                }
846            }
847
848            // Resolve our own node_id from the group; if not present
849            // we cannot run an election (closed-world guard — see F22).
850            let self_node_id =
851                group.get_node(&self.config.node_name).map(|n| n.node_id());
852            let self_node_id = match self_node_id {
853                Some(id) => id,
854                None => {
855                    log::warn!(
856                        "election driver: node '{}' not registered in \
857                         own group view; sleeping",
858                        self.config.node_name
859                    );
860                    std::thread::sleep(Duration::from_millis(200));
861                    continue;
862                }
863            };
864
865            log::debug!(
866                "election driver on '{}': starting term={} with {} peers",
867                self.config.node_name,
868                term,
869                channels.len(),
870            );
871            let outcome = crate::elections::paxos::run_election_with_phi_dtvlsn(
872                self_node_id,
873                &self.config.node_name,
874                &group,
875                &channels,
876                our_vlsn,
877                /* priority */ 1,
878                term,
879                /* own_dtvlsn (D2 major ranking key) */
880                self.get_dtvlsn(),
881                None,
882                std::time::Duration::from_millis(500),
883            );
884
885            match outcome {
886                Some(winner_id) if winner_id == self_node_id => {
887                    if let Err(e) = self.become_master(term) {
888                        log::warn!(
889                            "election driver: become_master failed: {}",
890                            e
891                        );
892                    } else {
893                        log::info!(
894                            "election driver: '{}' became master at term {}",
895                            self.config.node_name,
896                            term,
897                        );
898                    }
899                }
900                Some(winner_id) => {
901                    if let Some(winner_node) = group
902                        .get_nodes()
903                        .into_iter()
904                        .find(|n| n.node_id() == winner_id)
905                    {
906                        if let Err(e) = self.become_replica(&winner_node.name) {
907                            log::warn!(
908                                "election driver: become_replica failed: {}",
909                                e
910                            );
911                        } else {
912                            log::info!(
913                                "election driver: '{}' became replica of '{}' at term {}",
914                                self.config.node_name,
915                                winner_node.name,
916                                term,
917                            );
918                        }
919                    }
920                }
921                None => {
922                    log::debug!(
923                        "election driver on '{}' term={}: no quorum",
924                        self.config.node_name,
925                        term,
926                    );
927                }
928            }
929
930            term = term.saturating_add(1);
931            // Back off so we don't pin the loop on transient failures.
932            std::thread::sleep(
933                self.config.election_timeout.min(Duration::from_millis(500)),
934            );
935        }
936    }
937
938    /// Internal: a `RepGroup` snapshot that includes self.
939    fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
940        let mut group = self.get_rep_group();
941        // Ensure self is present in the group view; the
942        // group_service does not auto-register the local node.
943        if group.get_node(&self.config.node_name).is_none() {
944            let mut self_node = crate::rep_node::RepNode::new(
945                self.config.node_name.clone(),
946                self.config.node_type,
947                self.config.node_host.clone(),
948                self.config.node_port,
949                /* node_id */ 0,
950            );
951            // Stable self node_id derived from the name hash so
952            // re-creations in the same process don't collide.
953            use std::hash::{Hash, Hasher};
954            let mut hasher = std::collections::hash_map::DefaultHasher::new();
955            self.config.node_name.hash(&mut hasher);
956            // Restrict to a u32 range and avoid 0 (reserved for
957            // "unknown").
958            let id = ((hasher.finish() as u32) | 1).max(1);
959            self_node.node_id = id;
960            group.add_node(self_node);
961        }
962        group
963    }
964
965    /// Return the socket address the TCP service dispatcher is bound to.
966    ///
967    /// This may differ from the configured `node_port` when port 0 is used
968    /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
969    /// could not be started (e.g. the address is not resolvable).
970    pub fn bound_addr(&self) -> Option<SocketAddr> {
971        self.bound_addr
972    }
973
974    /// Wire a live `EnvironmentImpl` into this replicated environment.
975    ///
976    /// After this call, state transitions (`become_master`, `become_replica`)
977    /// will spawn real feeder/receiver I/O threads backed by the live log.
978    ///
979    /// If the RESTORE service was not registered at construction time (because
980    /// `config.env_home` was `None`), it is registered here using the
981    /// environment's actual home path.  This mirrors`RepNode.envSetup()`
982    /// which registers the restore handler during environment wiring.
983    ///
984    /// Environment reference wiring.
985    /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
986    pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
987        // Register RESTORE service lazily if not already done.
988        if !self.restore_registered.load(Ordering::SeqCst)
989            && let Some(ref dispatcher) = self.tcp_dispatcher
990        {
991            let env_home = env.get_env_home().to_path_buf();
992            let restore_server = NetworkRestoreServer::new(env_home.clone());
993            dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
994            self.restore_registered.store(true, Ordering::SeqCst);
995            log::debug!(
996                "Node '{}' RESTORE service registered via with_environment \
997                 (env_home={})",
998                self.config.node_name,
999                env_home.display(),
1000            );
1001        }
1002
1003        // X-14: rebuild the VLSN index from recovery-replayed LN entries.
1004        // After a crash the on-disk vlsn.idx may be stale (either ahead of
1005        // the recovered B-tree, or behind if vlsn.idx was not flushed
1006        // after the last checkpoint).  Re-registering all (vlsn, lsn) pairs
1007        // from the redo pass gives a consistent in-memory index.
1008        if !env.recovery_vlsns.is_empty() {
1009            log::info!(
1010                "Node '{}': rebuilding VLSN index from {} recovered entries",
1011                self.config.node_name,
1012                env.recovery_vlsns.len(),
1013            );
1014            for &(vlsn, lsn_u64) in &env.recovery_vlsns {
1015                let lsn = noxu_util::Lsn::from_u64(lsn_u64);
1016                self.vlsn_index.register(
1017                    vlsn,
1018                    lsn.file_number(),
1019                    lsn.file_offset(),
1020                );
1021            }
1022        }
1023
1024        // X-1: truncate the VLSN index to the rollback matchpoint if recovery
1025        // detected a completed rollback period.  The matchpoint is the highest
1026        // LSN that is still valid after the rollback; entries with higher VLSNs
1027        // correspond to data that was rolled back and must not appear in the
1028        // index.
1029        if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
1030            // Find the latest VLSN whose LSN is at or before the matchpoint.
1031            // Scan the recovered VLSN pairs (sorted ascending) to find the
1032            // boundary.
1033            let safe_vlsn = env
1034                .recovery_vlsns
1035                .iter()
1036                .rev()
1037                .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
1038                .map(|&(vlsn, _)| vlsn)
1039                .unwrap_or(0);
1040            log::info!(
1041                "Node '{}': truncating VLSN index after vlsn={} \
1042                 (rollback matchpoint lsn={:#x})",
1043                self.config.node_name,
1044                safe_vlsn,
1045                matchpoint_lsn_u64,
1046            );
1047            self.vlsn_index.truncate_after(safe_vlsn);
1048        }
1049
1050        *self.env_impl.lock().unwrap() = Some(Arc::clone(&env));
1051
1052        // C-C2b: install the VLSN counter so log_txn_commit writes
1053        // VLSN-tagged headers.  When become_master then spawns an
1054        // EnvironmentLogScanner-backed FeederRunner, it will find these
1055        // entries and auto-feed them to replicas without any
1056        // replicate_entry call from the application.
1057        env.set_replication_vlsn_counter(Arc::clone(&self.wal_vlsn_counter));
1058    }
1059
1060    /// Get the current node state.
1061    ///
1062    ///
1063    ///
1064    /// Returns the current state of the node associated with this replication
1065    /// environment. If the caller's intent is to track the state of the node,
1066    /// `StateChangeListener` may be a more convenient and efficient approach.
1067    pub fn get_state(&self) -> NodeState {
1068        self.node_state.get_state()
1069    }
1070
1071    /// Check if this node is the master.
1072    ///
1073    /// Returns true if the node's current state is Master.
1074    pub fn is_master(&self) -> bool {
1075        self.node_state.get_state() == NodeState::Master
1076    }
1077
1078    /// Returns true if this node is an *authoritative* master (D4, JE
1079    /// `ElectionQuorum.isAuthoritativeMaster`): it is the group master AND it
1080    /// is still connected to enough replicas that, including itself, a
1081    /// SIMPLE_MAJORITY quorum is present.
1082    ///
1083    /// A master on the minority side of a network partition is NOT
1084    /// authoritative — it must not claim the special election ranking
1085    /// (`MASTER_RANKING`) nor (eventually) continue accepting writes, so the
1086    /// majority side can elect a fresh master without it competing
1087    /// (split-brain prevention).
1088    ///
1089    /// "Active replica count" = the number of currently-connected push-feeder
1090    /// runners serving *electable* peers (Monitors/Secondaries do not count
1091    /// toward the election quorum). `+ 1` for this master itself.
1092    pub fn is_authoritative_master(&self) -> bool {
1093        if !self.is_master() {
1094            return false;
1095        }
1096        let group = self.get_rep_group();
1097        // Total electable nodes (incl. self) — peers + this master.
1098        let electable_total: usize = group
1099            .get_nodes()
1100            .iter()
1101            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
1102            .count()
1103            + 1; // +1 for self/master (not registered as a peer)
1104
1105        // Active replicas = connected feeder runners whose peer is electable.
1106        let active_electable_replicas: usize = {
1107            let runners = self.active_feeder_runners.lock().unwrap();
1108            runners
1109                .keys()
1110                .filter(|name| {
1111                    group
1112                        .get_node(name)
1113                        .map(|n| {
1114                            n.node_type == crate::node_type::NodeType::Electable
1115                        })
1116                        .unwrap_or(false)
1117                })
1118                .count()
1119        };
1120        Self::authoritative_quorum_met(
1121            active_electable_replicas,
1122            electable_total,
1123        )
1124    }
1125
1126    /// Pure SIMPLE_MAJORITY quorum check for `is_authoritative_master` (JE
1127    /// `ElectionQuorum.isAuthoritativeMaster`): `(activeReplicas + 1) >=
1128    /// quorumSize` where `quorumSize = electableTotal / 2 + 1`.
1129    fn authoritative_quorum_met(
1130        active_electable_replicas: usize,
1131        electable_total: usize,
1132    ) -> bool {
1133        let quorum_size = electable_total / 2 + 1;
1134        (active_electable_replicas + 1) >= quorum_size
1135    }
1136
1137    /// Check if this node is a replica.
1138    ///
1139    /// Returns true if the node's current state is Replica.
1140    pub fn is_replica(&self) -> bool {
1141        self.node_state.get_state() == NodeState::Replica
1142    }
1143
1144    /// Returns true if the node is currently participating in the group
1145    /// as a Replica or a Master.
1146    pub fn is_active(&self) -> bool {
1147        self.node_state.get_state().is_active()
1148    }
1149
1150    /// Get the node name.
1151    ///
1152    ///
1153    ///
1154    /// Returns the unique name used to identify this replicated environment.
1155    pub fn get_node_name(&self) -> &str {
1156        self.config.node_name.as_str()
1157    }
1158
1159    /// Get the group name.
1160    ///
1161    /// Returns the name of the replication group this node belongs to.
1162    pub fn get_group_name(&self) -> &str {
1163        self.config.group_name.as_str()
1164    }
1165
1166    /// Get the current master (if known).
1167    ///
1168    /// Returns the name of the node that is currently the master, or None
1169    /// if the master is not known (e.g. the node is in the Unknown or
1170    /// Detached state).
1171    pub fn get_master_name(&self) -> Option<String> {
1172        self.master_tracker.get_master()
1173    }
1174
1175    /// Get the replication group info.
1176    ///
1177    ///
1178    ///
1179    /// Returns a description of the replication group as known by this node.
1180    /// The replicated group metadata is stored in a replicated database and
1181    /// updates are propagated by the current master node to all replicas. If
1182    /// this node is not the master, it is possible for its description of the
1183    /// group to be out of date.
1184    pub fn get_group(&self) -> &GroupService {
1185        &self.group_service
1186    }
1187
1188    /// Add a peer node to the replication group at runtime.
1189    ///
1190    /// The node is registered in the `GroupService` so elections and quorum
1191    /// calculations immediately reflect the new membership.
1192    pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
1193        use crate::group_service::NodeInfo;
1194        use std::time::Instant;
1195
1196        let info = NodeInfo {
1197            name: node.name.clone(),
1198            node_type: node.node_type,
1199            host: node.host.clone(),
1200            port: node.port,
1201            node_id: node.node_id,
1202            joined_at: Instant::now(),
1203            last_seen: Instant::now(),
1204            is_active: true,
1205            known_vlsn: 0,
1206            log_range: None,
1207            read_capacity_pct: node.read_capacity_pct,
1208            write_capacity_pct: node.write_capacity_pct,
1209            latency_hint_ms: node.latency_hint_ms,
1210        };
1211        self.group_service.add_node(info)?;
1212        log::info!(
1213            "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1214            self.config.node_name,
1215            node.name,
1216            node.host,
1217            node.port,
1218            self.config.group_name,
1219        );
1220
1221        // F9: if we are the current master, immediately register a
1222        // `Feeder` tracker for the new peer so AckTracker bookkeeping
1223        // and downstream pull-based streaming work without a forced
1224        // re-election.
1225        if self.is_master()
1226            && (node.node_type == crate::node_type::NodeType::Electable
1227                || node.node_type == crate::node_type::NodeType::Secondary)
1228        {
1229            let mut feeders = self.feeders.write();
1230            if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1231                feeders.push(Feeder::new(node.name.clone()));
1232                log::debug!(
1233                    "Node '{}' (master): dispatched Feeder for new peer '{}'",
1234                    self.config.node_name,
1235                    node.name,
1236                );
1237            }
1238        }
1239        Ok(())
1240    }
1241
1242    /// Remove a peer node from the replication group by name.
1243    ///
1244    /// The node is deregistered from the `GroupService`.  Elections initiated
1245    /// after this call will not include the removed node in quorum calculations.
1246    pub fn remove_peer(&self, name: &str) -> Result<()> {
1247        self.group_service.remove_node(name)?;
1248        log::info!(
1249            "Node '{}': removed peer '{}' from group '{}'",
1250            self.config.node_name,
1251            name,
1252            self.config.group_name,
1253        );
1254        Ok(())
1255    }
1256
1257    /// Update the capacity and latency metadata of an existing peer.
1258    ///
1259    /// Only the following fields are updated from `node`:
1260    ///   - `read_capacity_pct`
1261    ///   - `write_capacity_pct`
1262    ///   - `latency_hint_ms`
1263    ///
1264    /// The node's identity (name, address, port, node_type) is preserved.
1265    /// Safe to call while replication is active.
1266    ///
1267    /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1268    /// is rebuilt to reflect the new capacity/latency weights.
1269    ///
1270    /// # Note
1271    ///
1272    /// `update_peer_metadata` does not currently re-run
1273    /// `QuorumPolicy::validate(electable_count)` after the metadata
1274    /// change.  An LP-optimal `Expression` quorum that was safe before
1275    /// the update may no longer satisfy the intersection property
1276    /// afterwards.  Until automatic revalidation lands, deployments
1277    /// using `QuorumPolicy::Expression` should call
1278    /// `quorum_policy().validate(get_rep_group().electable_count())`
1279    /// on the returned `RepGroup` after every metadata change and
1280    /// fail the operator-facing operation if validation reports
1281    /// unsafety.
1282    pub fn update_peer_metadata(
1283        &self,
1284        name: &str,
1285        node: crate::rep_node::RepNode,
1286    ) -> Result<()> {
1287        self.group_service.update_node_metadata(
1288            name,
1289            node.read_capacity_pct,
1290            node.write_capacity_pct,
1291            node.latency_hint_ms,
1292        )?;
1293        log::info!(
1294            "Node '{}': updated metadata for peer '{}' \
1295             (read_cap={}, write_cap={}, latency={}ms)",
1296            self.config.node_name,
1297            name,
1298            node.read_capacity_pct,
1299            node.write_capacity_pct,
1300            node.latency_hint_ms,
1301        );
1302        Ok(())
1303    }
1304
1305    /// Returns a snapshot of the current replication group as a `RepGroup`.
1306    ///
1307    /// The snapshot reflects the state at the time of the call; subsequent
1308    /// `add_peer` / `remove_peer` calls are not reflected in it.
1309    pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1310        use crate::rep_group::RepGroup;
1311
1312        let mut group = RepGroup::new(
1313            self.config.group_name.clone(),
1314            self.group_service.get_group_id(),
1315        );
1316        for info in self.group_service.get_all_nodes() {
1317            let mut node = crate::rep_node::RepNode::new(
1318                info.name.clone(),
1319                info.node_type,
1320                info.host.clone(),
1321                info.port,
1322                info.node_id,
1323            );
1324            node.read_capacity_pct = info.read_capacity_pct;
1325            node.write_capacity_pct = info.write_capacity_pct;
1326            node.latency_hint_ms = info.latency_hint_ms;
1327            group.add_node(node);
1328        }
1329        group
1330    }
1331
1332    /// Get the replication configuration.
1333    ///
1334    ///
1335    ///
1336    /// Returns the replication configuration that has been used to create this
1337    /// environment.
1338    pub fn get_config(&self) -> &RepConfig {
1339        &self.config
1340    }
1341
1342    /// Get the current VLSN range on this node.
1343    ///
1344    /// Returns the range of VLSNs currently available on this node.
1345    pub fn get_vlsn_range(&self) -> VlsnRange {
1346        self.vlsn_index.get_range()
1347    }
1348
1349    /// Get the latest VLSN.
1350    ///
1351    /// Returns the most recent VLSN registered on this node.
1352    pub fn get_current_vlsn(&self) -> u64 {
1353        self.vlsn_index.get_latest_vlsn()
1354    }
1355
1356    /// Test-only: clone the env's SHARED VLSN index `Arc`.
1357    ///
1358    /// REP-6: the replica receive loop (`become_replica` ->
1359    /// `EnvironmentLogWriter`) must feed THIS index — the one
1360    /// `get_vlsn_range`, `flush_to_disk`, and election ranking read — not a
1361    /// throwaway. Tests use this to build a writer the same way
1362    /// `become_replica` does and assert the shared index advances.
1363    #[cfg(feature = "test-harness")]
1364    pub fn vlsn_index_arc(&self) -> Arc<crate::vlsn::vlsn_index::VlsnIndex> {
1365        Arc::clone(&self.vlsn_index)
1366    }
1367
1368    /// Return the list of replica names that currently have a `Feeder`
1369    /// tracker on this (master) node.
1370    ///
1371    /// Used by tests and operator tooling.  The returned list reflects
1372    /// the master's view at the time of the call; subsequent
1373    /// `add_peer`/`remove_peer` calls may change it.
1374    pub fn feeder_replica_names(&self) -> Vec<String> {
1375        self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1376    }
1377
1378    // -----------------------------------------------------------------------
1379    // C-C2 — active push feeder API
1380    // -----------------------------------------------------------------------
1381
1382    /// Register a channel for pushing log entries to a specific replica.
1383    ///
1384    /// When [`Self::become_master`] is called — or if the node is **already
1385    /// master** — a [`FeederRunner`] background thread is immediately spawned
1386    /// for this channel.  The thread reads from a dedicated in-memory queue
1387    /// that is fed by [`Self::replicate_entry`] / [`Self::apply_entry`], and
1388    /// sends framed log entries to the replica over `channel`.  Acks sent
1389    /// back by the replica are visible via
1390    /// [`Self::active_feeder_runner_acked_vlsn`].
1391    ///
1392    /// # Production vs. test use
1393    ///
1394    /// *Production*: pass a [`crate::net::TcpChannel`] connected to the
1395    /// replica's inbound feeder service.  
1396    /// *Tests*: pass one half of a [`crate::net::LocalChannelPair`].
1397    ///
1398    /// # Note on push vs. pull
1399    ///
1400    /// Registering a channel activates the **push** path: the master
1401    /// initiates and owns the feeder connection.  The existing **pull** path
1402    /// (`PeerFeederService` / `catch_up_from_peer`) continues to operate in
1403    /// parallel for replicas that connect proactively.  Do not register a
1404    /// channel for a replica that already connects via the pull path, or
1405    /// entries may be delivered twice.
1406    ///
1407    /// If `become_master` was called *before* registering the channel, call
1408    /// this method afterward; it will spawn the FeederRunner immediately.
1409    pub fn register_feeder_channel(
1410        &self,
1411        replica_name: String,
1412        channel: Arc<dyn crate::net::Channel>,
1413    ) {
1414        {
1415            let mut ch = self.feeder_channels.lock().unwrap();
1416            ch.insert(replica_name.clone(), Arc::clone(&channel));
1417        }
1418        if self.is_master() {
1419            self.spawn_feeder_runner(replica_name, channel);
1420        }
1421    }
1422
1423    /// Return the last VLSN acknowledged by the FeederRunner for `replica_name`.
1424    ///
1425    /// Returns `0` if no FeederRunner is currently active for that replica
1426    /// (either `become_master` was not called yet, or no channel was
1427    /// registered).  Use this to poll catch-up progress before shutdown.
1428    pub fn active_feeder_runner_acked_vlsn(&self, replica_name: &str) -> u64 {
1429        self.active_feeder_runners
1430            .lock()
1431            .unwrap()
1432            .get(replica_name)
1433            .map(|r| r.known_replica_vlsn())
1434            .unwrap_or(0)
1435    }
1436
1437    /// Spawn a FeederRunner thread for `replica_name` using `channel`.
1438    ///
1439    /// Creates a dedicated `PeerLogScanner` queue for the replica, registers
1440    /// it in `feeder_queues` so that future `replicate_entry` / `apply_entry`
1441    /// calls fan out into it, spawns the `FeederRunner::run` loop, and
1442    /// records the `Arc<FeederRunner>` in `active_feeder_runners`.
1443    ///
1444    /// Idempotent: if a FeederRunner is already active for `replica_name`
1445    /// (from a prior `become_master` call), it is replaced — the old channel
1446    /// should have been closed already via `close()`.
1447    ///
1448    /// **WAL-scanner auto-feed path (C-C2b)**: when a live `EnvironmentImpl`
1449    /// has been wired via `with_environment`, the FeederRunner thread uses an
1450    /// `EnvironmentLogScanner` as its source.  Every `log_txn_commit` on the
1451    /// master writes a VLSN-tagged WAL entry (22-byte header); the scanner
1452    /// finds these entries and streams them to the replica automatically,
1453    /// without any `replicate_entry` call from the application.
1454    ///
1455    /// **Fallback path**: when no `EnvironmentImpl` is wired the runner reads
1456    /// from the in-memory `PeerLogScanner` queue populated by
1457    /// `replicate_entry` / `apply_entry` — the previous manual behaviour.
1458    fn spawn_feeder_runner(
1459        &self,
1460        replica_name: String,
1461        channel: Arc<dyn crate::net::Channel>,
1462    ) {
1463        // Dedicated entry queue: entries flowing from this master reach the
1464        // FeederRunner without competing with PeerFeederService.
1465        let queue = Arc::new(PeerLogScanner::new());
1466        {
1467            self.feeder_queues
1468                .write()
1469                .unwrap()
1470                .insert(replica_name.clone(), Arc::clone(&queue));
1471        }
1472
1473        // REP-9 Part 1: wire an ack sink so the FeederRunner forwards every
1474        // inbound replica ack to `env.record_ack(vlsn, replica_name)`, which
1475        // reaches BOTH the AckTracker (commit-blocking quorum) and the
1476        // matching `Feeder::acked_vlsn` (DTVLSN ranking).  Without this the
1477        // ack reached only the runner's private `known_replica_vlsn`.  The
1478        // sink holds a `Weak<Self>` so it never extends the env's lifetime;
1479        // if `self_weak` was never initialised we fall back to the plain
1480        // (sink-less) runner — `record_ack` is still reachable from tests.
1481        let runner = match self.self_weak.get().and_then(Weak::upgrade) {
1482            Some(env_arc) => {
1483                let weak = Arc::downgrade(&env_arc);
1484                let sink: crate::stream::feeder::AckSink =
1485                    Arc::new(move |name: &str, vlsn: u64| {
1486                        if let Some(env) = weak.upgrade() {
1487                            env.record_ack(vlsn, name);
1488                        }
1489                    });
1490                Arc::new(FeederRunner::new_with_ack_sink(
1491                    Arc::clone(&channel),
1492                    1,
1493                    replica_name.clone(),
1494                    sink,
1495                ))
1496            }
1497            None => Arc::new(FeederRunner::new(Arc::clone(&channel), 1)),
1498        };
1499        let runner_clone = Arc::clone(&runner);
1500        let replica_clone = replica_name.clone();
1501
1502        // C-C2b: prefer EnvironmentLogScanner (WAL auto-feed) when env is
1503        // wired; fall back to in-memory queue (manual replicate_entry path)
1504        // otherwise.
1505        let env_opt = self.env_impl.lock().unwrap().clone();
1506
1507        let handle = std::thread::Builder::new()
1508            .name(format!("noxu-feeder-{}", replica_name))
1509            .spawn(move || {
1510                if let Some(env) = env_opt {
1511                    if let Some(mut scanner) =
1512                        EnvironmentLogScanner::new(&env, None)
1513                    {
1514                        log::info!(
1515                            "FeederRunner for replica '{}': using \
1516                             EnvironmentLogScanner (WAL auto-feed)",
1517                            replica_clone,
1518                        );
1519                        let _ = runner_clone.run(&mut scanner);
1520                    } else {
1521                        log::warn!(
1522                            "FeederRunner for replica '{}': \
1523                             EnvironmentLogScanner unavailable, \
1524                             falling back to in-memory queue",
1525                            replica_clone,
1526                        );
1527                        let mut source = PeerScannerAdapter::new(queue, 0);
1528                        let _ = runner_clone.run(&mut source);
1529                    }
1530                } else {
1531                    let mut source = PeerScannerAdapter::new(queue, 0);
1532                    let _ = runner_clone.run(&mut source);
1533                }
1534                log::debug!(
1535                    "FeederRunner for replica '{}' exited cleanly",
1536                    replica_clone
1537                );
1538            })
1539            .expect("failed to spawn FeederRunner thread");
1540
1541        {
1542            let mut runners = self.active_feeder_runners.lock().unwrap();
1543            runners.insert(replica_name.clone(), Arc::clone(&runner));
1544        }
1545        self.io_threads.lock().unwrap().push(handle);
1546
1547        log::info!(
1548            "Node '{}' (master): FeederRunner thread spawned for replica '{}'",
1549            self.config.node_name.as_str(),
1550            replica_name,
1551        );
1552    }
1553
1554    // -----------------------------------------------------------------------
1555
1556    /// Bootstrap this node's environment by network-restoring all `.ndb`
1557    /// files from `peer_name` via the dispatcher's RESTORE service.
1558    ///
1559    /// Closes findings F2 / F4 of the 2026 review.
1560    ///
1561    /// The standalone `NetworkRestore::execute()` opens raw TCP and
1562    /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1563    /// Production replicated environments host the RESTORE handler on the
1564    /// dispatcher, so this method routes through `execute_via_dispatcher`.
1565    ///
1566    /// `peer_name` must be a known peer in `GroupService`; on success the
1567    /// peer's `.ndb` files are written into `config.env_home`.  Returns
1568    /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1569    /// fails for any reason.
1570    pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1571        let env_home = self.config.env_home.clone().ok_or_else(|| {
1572            RepError::ConfigError(
1573                "bootstrap_via_dispatcher requires env_home in RepConfig"
1574                    .into(),
1575            )
1576        })?;
1577        let peer_info = self
1578            .group_service
1579            .get_all_nodes()
1580            .into_iter()
1581            .find(|n| n.name == peer_name)
1582            .ok_or_else(|| {
1583                RepError::ConfigError(format!(
1584                    "peer '{}' not registered in group '{}'",
1585                    peer_name, self.config.group_name,
1586                ))
1587            })?;
1588
1589        let cfg = NetworkRestoreConfig {
1590            source_node: peer_info.name.clone(),
1591            source_host: peer_info.host.clone(),
1592            source_port: peer_info.port,
1593            retain_log_files: true,
1594        };
1595        let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1596        restore.execute_via_dispatcher()?;
1597        log::info!(
1598            "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1599            self.config.node_name,
1600            peer_info.name,
1601            peer_info.host,
1602            peer_info.port,
1603        );
1604        Ok(())
1605    }
1606
1607    /// Get replication statistics.
1608    ///
1609    ///
1610    ///
1611    /// Returns statistics associated with this environment.
1612    pub fn get_stats(&self) -> &RepStats {
1613        &self.stats
1614    }
1615
1616    /// Get the ack tracker.
1617    pub fn get_ack_tracker(&self) -> &AckTracker {
1618        &self.ack_tracker
1619    }
1620
1621    /// Ensure the node state machine is in Unknown state, transitioning
1622    /// from Detached if necessary. This is needed because the state machine
1623    /// only allows Detached -> Unknown -> Master/Replica.
1624    pub fn ensure_unknown_state(&self) -> Result<()> {
1625        let current = self.node_state.get_state();
1626        match current {
1627            NodeState::Unknown => Ok(()),
1628            NodeState::Detached => {
1629                self.node_state.transition_to(NodeState::Unknown)?;
1630                Ok(())
1631            }
1632            // Master and Replica must transition through Unknown before
1633            // joining a new group or reconnecting.
1634            NodeState::Master | NodeState::Replica => {
1635                self.node_state.transition_to(NodeState::Unknown)?;
1636                Ok(())
1637            }
1638            NodeState::Shutdown => {
1639                Err(RepError::StateError("Node is shut down".to_string()))
1640            }
1641        }
1642    }
1643
1644    /// Transition to master state.
1645    ///
1646    /// Transitions this node to Master state for the given election term.
1647    /// As master, the node can accept write operations and feed log entries
1648    /// to replicas.
1649    ///
1650    /// **Active push-feeder** (C-C2): if feeder channels have been registered
1651    /// via [`Self::register_feeder_channel`] before this call, a
1652    /// [`FeederRunner`] background thread is spawned per channel.
1653    ///
1654    /// **WAL-scanner auto-feed path (C-C2b, v3.3.0)**: when
1655    /// [`Self::with_environment`] has been called before `become_master`,
1656    /// each `FeederRunner` thread uses an [`EnvironmentLogScanner`] as its
1657    /// source.  Every `log_txn_commit` on the master writes a VLSN-tagged
1658    /// 22-byte WAL entry (via `LogManager::log_with_vlsn`); the scanner
1659    /// discovers these entries and streams them to replicas automatically,
1660    /// without any [`Self::replicate_entry`] call from the application.
1661    ///
1662    /// **Fallback path**: when no `EnvironmentImpl` is wired, the runner
1663    /// reads from the in-memory queue populated by [`Self::replicate_entry`] /
1664    /// [`Self::apply_entry`].
1665    ///
1666    /// If no feeder channels are registered, this call registers per-replica
1667    /// `Feeder` tracker structs for `AckTracker` bookkeeping only.  In that
1668    /// case replicas must connect proactively to the `PEER_FEEDER` pull
1669    /// service to receive entries.
1670    pub fn become_master(&self, term: u64) -> Result<()> {
1671        if self.is_shutdown() {
1672            return Err(RepError::StateError(
1673                "Cannot become master: environment is closed".to_string(),
1674            ));
1675        }
1676
1677        // JE invariant: only `Electable` nodes can become master.  `Secondary`,
1678        // `Monitor`, and `Arbiter` are not electable and must be rejected at
1679        // the API layer (mirrors JE `ExceptionTest`).  See
1680        // `NodeType::can_be_master`.
1681        if !self.config.node_type.can_be_master() {
1682            return Err(RepError::InvalidStateTransition(format!(
1683                "node '{}' has type {} which is not electable as master",
1684                self.config.node_name.as_str(),
1685                self.config.node_type,
1686            )));
1687        }
1688
1689        // Ensure we can reach Master state (may need Detached -> Unknown first)
1690        self.ensure_unknown_state()?;
1691
1692        let old_state = self.node_state.get_state();
1693        self.node_state.transition_to(NodeState::Master)?;
1694        self.master_tracker.set_master(self.config.node_name.as_str(), term);
1695
1696        // --- F9: spawn Feeder trackers for each known replica -------------
1697        //
1698        // Closes finding F9 of the 2026 review.
1699        // The architecture is pull-based: replicas pull from the master's
1700        // `PEER_FEEDER` service via `catch_up_from_peer`.  However, the
1701        // master must:
1702        //   1. Track each replica via a `Feeder` so AckTracker bookkeeping
1703        //      can attribute replica acks to the right node.
1704        //   2. Push its own writes into `peer_scanner` so replicas pulling
1705        //      from `PEER_FEEDER` actually receive entries (`replicate_entry`).
1706        //
1707        // Here we ensure step 1: every known electable peer in the group
1708        // gets a `Feeder` entry.
1709        {
1710            let mut feeders = self.feeders.write();
1711            // Drop any stale feeders left over from a prior role.  A
1712            // `Feeder` is just an in-memory tracker; recreating it is
1713            // cheap and avoids state inversion bugs across role changes.
1714            feeders.clear();
1715            for peer in self.group_service.get_all_nodes() {
1716                if peer.name == self.config.node_name {
1717                    continue;
1718                }
1719                if peer.node_type != crate::node_type::NodeType::Electable
1720                    && peer.node_type != crate::node_type::NodeType::Secondary
1721                {
1722                    // Arbiters do not receive log entries.
1723                    continue;
1724                }
1725                feeders.push(Feeder::new(peer.name.clone()));
1726                log::debug!(
1727                    "Node '{}' (master, term={}): registered Feeder for \
1728                     replica '{}'",
1729                    self.config.node_name.as_str(),
1730                    term,
1731                    peer.name,
1732                );
1733            }
1734        }
1735
1736        // For observability, log the count.
1737        log::info!(
1738            "Node '{}' became master for term {} \
1739             (feeder trackers: {} known replicas)",
1740            self.config.node_name.as_str(),
1741            term,
1742            self.feeders.read().len(),
1743        );
1744
1745        // C-C2: spawn FeederRunner threads for pre-registered channels.
1746        //
1747        // When `register_feeder_channel` was called before `become_master`,
1748        // the channels are already in `feeder_channels`. Drain them and
1749        // spawn a FeederRunner per replica.  The FeederRunner reads from a
1750        // dedicated `PeerLogScanner` queue (populated by `replicate_entry`
1751        // fan-out) and pushes framed log entries to the replica over the
1752        // registered channel.  Acks from the replica are tracked in the
1753        // FeederRunner and visible via `active_feeder_runner_acked_vlsn`.
1754        {
1755            let channels: Vec<(String, Arc<dyn crate::net::Channel>)> = self
1756                .feeder_channels
1757                .lock()
1758                .unwrap()
1759                .iter()
1760                .map(|(k, v)| (k.clone(), Arc::clone(v)))
1761                .collect();
1762            for (replica_name, channel) in channels {
1763                self.spawn_feeder_runner(replica_name, channel);
1764            }
1765        }
1766
1767        // -------------------------------------------------------------------
1768
1769        // Notify listeners
1770        self.notify_listeners(old_state, NodeState::Master);
1771
1772        Ok(())
1773    }
1774
1775    /// Transition to replica state with the given master.
1776    ///
1777    /// Transitions this node to Replica state. The node will receive log
1778    /// entries from the specified master.
1779    ///
1780    /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1781    /// the method prepares an `EnvironmentLogWriter` so that replicated
1782    /// entries can be written to the local log.  The actual network connection
1783    /// is established by the `TcpServiceDispatcher`; this method logs intent.
1784    ///
1785    /// In HA.
1786    pub fn become_replica(&self, master_name: &str) -> Result<()> {
1787        if self.is_shutdown() {
1788            return Err(RepError::StateError(
1789                "Cannot become replica: environment is closed".to_string(),
1790            ));
1791        }
1792
1793        // Ensure we can reach Replica state (may need Detached -> Unknown first)
1794        self.ensure_unknown_state()?;
1795
1796        let old_state = self.node_state.get_state();
1797        self.node_state.transition_to(NodeState::Replica)?;
1798        self.master_tracker.set_master(master_name, 0);
1799        self.replica_stream.set_master(master_name);
1800        self.replica_stream.set_state(
1801            crate::stream::replica_stream::ReplicaStreamState::Connecting,
1802        );
1803
1804        // --- G19: start replica receive loop --------------------------------
1805        //
1806        // Connects to the master's PEER_FEEDER service and runs a
1807        // ReplicaReceiver loop in a background thread.  The receiver writes
1808        // replicated entries via EnvironmentLogWriter.
1809        if let Some(env) = self.env_impl.lock().unwrap().clone() {
1810            if let Some(log_mgr) = env.get_log_manager() {
1811                // REP-6: feed the env's SHARED, persisted VLSN index (the one
1812                // flush_to_disk persists and get_vlsn_range / election ranking
1813                // read) into the replica receive loop — NOT a throwaway. Using
1814                // a fresh index would leave the persisted vlsn.idx, the
1815                // reported VLSN range, and the DTVLSN-ranking own_vlsn lagging
1816                // the actually-received stream, widening catch-up (or forcing
1817                // an unnecessary network restore) after a clean restart.
1818                // JE: the replica's VLSNIndex IS the environment's persisted
1819                // index (see VLSNIndex).
1820                let vlsn_index = Arc::clone(&self.vlsn_index);
1821
1822                // Resolve the master's socket address from the GroupService.
1823                let master_addr_opt: Option<SocketAddr> = self
1824                    .group_service
1825                    .get_all_nodes()
1826                    .iter()
1827                    .find(|n| n.name == master_name)
1828                    .and_then(|info| {
1829                        format!("{}:{}", info.host, info.port)
1830                            .parse::<SocketAddr>()
1831                            .ok()
1832                    });
1833
1834                let node_name = self.config.node_name.clone();
1835                let master = master_name.to_string();
1836                let vlsn_index_clone = Arc::clone(&vlsn_index);
1837                let shutdown_flag = self.io_shutdown.load(Ordering::SeqCst);
1838                // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
1839                // can call `bootstrap_via_dispatcher` automatically when
1840                // the master signals `NeedsRestore`.  When the env was
1841                // never registered with `init_self_weak` (raw
1842                // `Arc::new(Self::new(...))` without going through
1843                // `open()` or the test harness), the weak ref is `None`
1844                // and we fall back to operator-driven bootstrap.
1845                let self_weak: Option<Weak<Self>> =
1846                    self.self_weak.get().cloned();
1847
1848                let handle = std::thread::Builder::new()
1849                    .name(format!("noxu-replica-{}", node_name))
1850                    .spawn(move || {
1851                        let mut writer = EnvironmentLogWriter::new(
1852                            log_mgr,
1853                            vlsn_index_clone,
1854                        );
1855
1856                        let Some(addr) = master_addr_opt else {
1857                            log::warn!(
1858                                "noxu-replica-{}: master '{}' address not in RepGroup; \
1859                                 waiting for TCP dispatcher connection",
1860                                node_name, master,
1861                            );
1862                            return;
1863                        };
1864
1865                        // Catch-up loop: catch up, observe NeedsRestore,
1866                        // optionally auto-bootstrap, retry once.  We cap
1867                        // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
1868                        // (small) so a misbehaving master does not loop
1869                        // forever consuming network bandwidth.
1870                        const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
1871                        let mut attempts: u32 = 0;
1872                        loop {
1873                            log::info!(
1874                                "noxu-replica-{}: connecting to master '{}' at {}",
1875                                node_name, master, addr,
1876                            );
1877                            match crate::stream::peer_feeder::catch_up_from_peer(
1878                                addr, 0, &mut writer,
1879                            ) {
1880                                Ok(true) => {
1881                                    log::info!(
1882                                        "noxu-replica-{}: catch-up complete from '{}'",
1883                                        node_name, master,
1884                                    );
1885                                    return;
1886                                }
1887                                Ok(false) => {
1888                                    // F2/F4: master signals NeedsRestore.
1889                                    // Wave 9-A fix 2: if a Weak<Self> was
1890                                    // plumbed in, upgrade it and call
1891                                    // `bootstrap_via_dispatcher` ourselves
1892                                    // so the replica auto-bootstraps and
1893                                    // resumes catch-up without operator
1894                                    // intervention.
1895                                    log::warn!(
1896                                        "noxu-replica-{}: master '{}' requires restore",
1897                                        node_name, master,
1898                                    );
1899                                    attempts += 1;
1900                                    if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
1901                                        log::error!(
1902                                            "noxu-replica-{}: exceeded \
1903                                             auto-bootstrap attempts ({}); giving up",
1904                                            node_name,
1905                                            MAX_AUTO_BOOTSTRAP_ATTEMPTS,
1906                                        );
1907                                        return;
1908                                    }
1909                                    let env_arc = match self_weak
1910                                        .as_ref()
1911                                        .and_then(Weak::upgrade)
1912                                    {
1913                                        Some(e) => e,
1914                                        None => {
1915                                            // No back-ref or env dropped:
1916                                            // fall back to operator-driven
1917                                            // bootstrap and exit cleanly.
1918                                            log::warn!(
1919                                                "noxu-replica-{}: no back-reference \
1920                                                 available; operator must call \
1921                                                 bootstrap_via_dispatcher manually",
1922                                                node_name,
1923                                            );
1924                                            return;
1925                                        }
1926                                    };
1927                                    if env_arc.is_shutdown() {
1928                                        return;
1929                                    }
1930                                    log::info!(
1931                                        "noxu-replica-{}: auto-bootstrapping via \
1932                                         dispatcher from '{}' (attempt {})",
1933                                        node_name, master, attempts,
1934                                    );
1935                                    match env_arc
1936                                        .bootstrap_via_dispatcher(&master)
1937                                    {
1938                                        Ok(()) => {
1939                                            log::info!(
1940                                                "noxu-replica-{}: auto-bootstrap \
1941                                                 succeeded; resuming catch-up",
1942                                                node_name,
1943                                            );
1944                                            // Drop the strong ref before
1945                                            // re-entering catch-up so we
1946                                            // do not keep the env alive
1947                                            // longer than necessary.
1948                                            drop(env_arc);
1949                                            continue;
1950                                        }
1951                                        Err(e) => {
1952                                            log::error!(
1953                                                "noxu-replica-{}: auto-bootstrap \
1954                                                 failed: {}",
1955                                                node_name, e,
1956                                            );
1957                                            return;
1958                                        }
1959                                    }
1960                                }
1961                                Err(e) => {
1962                                    if !shutdown_flag {
1963                                        log::error!(
1964                                            "noxu-replica-{}: error from master '{}': {e}",
1965                                            node_name, master,
1966                                        );
1967                                    }
1968                                    return;
1969                                }
1970                            }
1971                        }
1972                    })
1973                    .expect("failed to spawn noxu-replica thread");
1974
1975                self.io_threads.lock().unwrap().push(handle);
1976
1977                log::debug!(
1978                    "Node '{}': replica receive thread started for master '{}'",
1979                    self.config.node_name.as_str(),
1980                    master_name,
1981                );
1982            } else {
1983                log::warn!(
1984                    "Node '{}': no LogManager available (read-only env?); \
1985                     replica I/O loop not started",
1986                    self.config.node_name.as_str(),
1987                );
1988            }
1989        }
1990        // -------------------------------------------------------------------
1991
1992        // Notify listeners
1993        self.notify_listeners(old_state, NodeState::Replica);
1994
1995        log::info!(
1996            "Node '{}' became replica of master '{}'",
1997            self.config.node_name.as_str(),
1998            master_name
1999        );
2000        Ok(())
2001    }
2002
2003    /// Initiate a master transfer to the target node.
2004    ///
2005    ///
2006    ///
2007    /// Transfers the current master state from this node to one of the
2008    /// electable replicas. The replica that is actually chosen to be the new
2009    /// master is the one with which the Master Transfer can be completed most
2010    /// rapidly. The transfer operation ensures that all changes at this node
2011    /// are available at the new master upon conclusion of the operation.
2012    pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
2013        if self.is_shutdown() {
2014            return Err(RepError::StateError(
2015                "Cannot transfer master: environment is closed".to_string(),
2016            ));
2017        }
2018
2019        if !self.is_master() {
2020            return Err(RepError::InvalidState(
2021                "Master transfer can only be initiated on the master node"
2022                    .to_string(),
2023            ));
2024        }
2025
2026        log::info!(
2027            "Node '{}' initiating master transfer to '{}'",
2028            self.config.node_name.as_str(),
2029            config.target_node,
2030        );
2031
2032        // Closes finding F7 of the 2026 review.
2033        //
2034        // Steps:
2035        //   1. Locate the target's address.
2036        //   2. Compute the new term (current observed term + 1).
2037        //   3. Send TRANSFER_MASTER to the target — it will become master.
2038        //   4. Send TRANSFER_MASTER (with the same term + new master name) to
2039        //      every other peer so they re-target.
2040        //   5. Demote self to Replica of the target.
2041        //
2042        // The transfer is best-effort: a peer that doesn't ack is logged
2043        // and skipped.  The election driver will reconcile any divergence
2044        // on the next election round.
2045
2046        let target_addr = self
2047            .group_service
2048            .get_all_nodes()
2049            .into_iter()
2050            .find(|n| n.name == config.target_node)
2051            .and_then(|n| {
2052                format!("{}:{}", n.host, n.port)
2053                    .parse::<std::net::SocketAddr>()
2054                    .ok()
2055            })
2056            .ok_or_else(|| {
2057                RepError::ConfigError(format!(
2058                    "transfer_master: target '{}' not registered or has bad address",
2059                    config.target_node
2060                ))
2061            })?;
2062
2063        let new_term = self.master_tracker.get_term().saturating_add(1);
2064
2065        // 1. Tell the target to become master at the new term.
2066        let target_ack = crate::group_admin::send_transfer_master(
2067            target_addr,
2068            &config.target_node,
2069            new_term,
2070        )
2071        .map_err(|e| {
2072            RepError::NetworkError(format!(
2073                "transfer_master: failed to signal target '{}': {}",
2074                config.target_node, e
2075            ))
2076        })?;
2077        if !target_ack {
2078            return Err(RepError::StateError(format!(
2079                "transfer_master: target '{}' rejected the transfer",
2080                config.target_node
2081            )));
2082        }
2083
2084        // 2. Inform all other peers (best-effort).
2085        for peer in self.group_service.get_all_nodes() {
2086            if peer.name == self.config.node_name
2087                || peer.name == config.target_node
2088            {
2089                continue;
2090            }
2091            if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
2092                let _ = crate::group_admin::send_transfer_master(
2093                    addr,
2094                    &config.target_node,
2095                    new_term,
2096                );
2097            }
2098        }
2099
2100        // 3. Demote self to Replica of the new master.
2101        self.become_replica(&config.target_node)?;
2102
2103        log::info!(
2104            "Node '{}' transferred master to '{}' at term {}",
2105            self.config.node_name.as_str(),
2106            config.target_node,
2107            new_term,
2108        );
2109        Ok(())
2110    }
2111
2112    /// Register a VLSN (as master, after writing a log entry).
2113    ///
2114    /// Maps the given VLSN to the specified log file position. This is called
2115    /// by the master after it writes a replicated log entry.
2116    pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
2117        self.vlsn_index.register(vlsn, file_number, file_offset);
2118    }
2119
2120    /// Replicate a freshly committed log entry from the master.
2121    ///
2122    /// Closes finding F9 of the 2026 review.
2123    ///
2124    /// Combines `register_vlsn` with a push into the in-memory
2125    /// `peer_scanner` so that downstream replicas pulling from this
2126    /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
2127    /// stream the entry without round-tripping through the on-disk
2128    /// log.  The local log is still the source of truth; the peer
2129    /// scanner is a fast-path cache that bounds itself via
2130    /// `PeerLogScanner::with_capacity` so old entries are evicted.
2131    ///
2132    /// Should be called by the master after the local commit has
2133    /// fsynced.  Calling on a non-master is harmless (the peer
2134    /// scanner cache is also used by replicas) but is logged at trace
2135    /// level for diagnostics.
2136    pub fn replicate_entry(
2137        &self,
2138        vlsn: u64,
2139        file_number: u32,
2140        file_offset: u32,
2141        entry_type: u8,
2142        data: Vec<u8>,
2143    ) {
2144        // Register VLSN -> LSN, dispatching entry type so lastSync /
2145        // lastTxnEnd advance (REP-5; JE VLSNRange.getUpdateForNewMapping).
2146        // An unknown type byte falls back to extend-only registration.
2147        match noxu_log::LogEntryType::from_type_num(entry_type) {
2148            Some(et) => self.vlsn_index.register_with_type(
2149                vlsn,
2150                file_number,
2151                file_offset,
2152                et,
2153            ),
2154            None => self.vlsn_index.register(vlsn, file_number, file_offset),
2155        }
2156        // Pull path: shared peer_scanner serves replicas connecting via
2157        // PeerFeederService (catch_up_from_peer).
2158        self.peer_scanner.push(vlsn, entry_type, data.clone());
2159        // Push path (C-C2): fan out to per-replica FeederRunner queues so
2160        // that threads spawned by become_master can stream entries to each
2161        // registered replica without competing with PeerFeederService.
2162        {
2163            let queues = self.feeder_queues.read().unwrap();
2164            for queue in queues.values() {
2165                queue.push(vlsn, entry_type, data.clone());
2166            }
2167        }
2168        if !self.is_master() {
2169            log::trace!(
2170                "replicate_entry called on non-master node '{}': vlsn={}, type={}",
2171                self.config.node_name,
2172                vlsn,
2173                entry_type,
2174            );
2175        }
2176    }
2177
2178    /// Apply a replicated entry (as replica).
2179    ///
2180    /// Applies a log entry received from the master. This is called by the
2181    /// replica stream handler after receiving an entry from the feeder.
2182    ///
2183    /// `data` is the wire-encoded log-record payload.  When the
2184    /// replicated environment has not been wired to a local
2185    /// `noxu_db::Environment` (i.e., before `with_environment` is
2186    /// called) the payload is forwarded into the in-memory peer
2187    /// scanner so that downstream replicas attached to the
2188    /// `PEER_FEEDER` service can re-stream it; the local log is **not**
2189    /// updated.  This is documented behaviour rather than a stub — see
2190    /// the 2026 review finding #26 (medium) for the
2191    /// `with_environment`-required local-apply path.
2192    /// cleanup (rep info F35: `_data` placeholder) renames the leading
2193    /// underscore so reviewers don't read it as a TODO.
2194    pub fn apply_entry(
2195        &self,
2196        vlsn: u64,
2197        entry_type: u8,
2198        data: Vec<u8>,
2199    ) -> Result<()> {
2200        if self.is_shutdown() {
2201            return Err(RepError::StateError(
2202                "Cannot apply entry: environment is closed".to_string(),
2203            ));
2204        }
2205
2206        // Register the VLSN in the index, dispatching entry type so
2207        // lastSync/lastTxnEnd advance (REP-5; JE
2208        // VLSNRange.getUpdateForNewMapping).
2209        match noxu_log::LogEntryType::from_type_num(entry_type) {
2210            Some(et) => self.vlsn_index.register_with_type(vlsn, 0, 0, et),
2211            None => self.vlsn_index.register(vlsn, 0, 0),
2212        }
2213
2214        // Push into the peer log scanner so downstream replicas can
2215        // receive this entry via the PEER_FEEDER service.
2216        self.peer_scanner.push(vlsn, entry_type, data.clone());
2217        // C-C2 push path: fan out to per-replica FeederRunner queues.
2218        {
2219            let queues = self.feeder_queues.read().unwrap();
2220            for queue in queues.values() {
2221                queue.push(vlsn, entry_type, data.clone());
2222            }
2223        }
2224
2225        log::trace!(
2226            "Applied replicated entry: vlsn={}, type={}",
2227            vlsn,
2228            entry_type
2229        );
2230        Ok(())
2231    }
2232
2233    /// Record an ack from a replica (as master).
2234    ///
2235    /// Records that the specified replica has acknowledged processing up to
2236    /// the given VLSN. This is used by the master to track durability
2237    /// guarantees.
2238    pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
2239        // Only acks from ELECTABLE replicas count toward the durability
2240        // quorum (JE DurabilityQuorum.replicaAcksQualify: Monitors and
2241        // Secondaries do not qualify). An ack from a non-electable / unknown
2242        // node is recorded for stats elsewhere but must not satisfy the
2243        // ReplicaAckPolicy. If the node is unknown to the group view we err
2244        // toward NOT counting it.
2245        let qualifies = self
2246            .get_rep_group()
2247            .get_node(replica_name)
2248            .map(|n| n.node_type().is_electable())
2249            .unwrap_or(false);
2250        if qualifies {
2251            self.ack_tracker.record_ack(vlsn, replica_name);
2252        }
2253        // REP-9 Part 1: advance the matching `Feeder::acked_vlsn` high-water
2254        // mark (read by `update_dtvlsn_from_feeders` and exposed via
2255        // `get_acked_vlsn`).  The production `FeederRunner` previously updated
2256        // only its private `known_replica_vlsn`, so the DTVLSN ranking never
2257        // saw production progress (JE `Feeder.getReplicaTxnEndVLSN`).  We
2258        // record the high-water for *any* replica (electable or not); the
2259        // electable filter is reapplied when DTVLSN/quorum is computed.
2260        for feeder in self.feeders.read().iter() {
2261            if feeder.get_replica_name() == replica_name {
2262                feeder.record_ack(vlsn);
2263                break;
2264            }
2265        }
2266        // Recompute the DTVLSN from feeder progress whenever an ack lands.
2267        self.update_dtvlsn_from_feeders();
2268        // REP-9: wake any committer parked in `await_replica_acks`. Its
2269        // satisfaction predicate is the high-water feeder count, not an
2270        // exact-VLSN registration, so we must notify unconditionally (the
2271        // AckTracker's own `record_ack` only notifies when the exact VLSN was
2272        // registered, which the per-frame feeder acks generally are not).
2273        self.ack_tracker.notify_waiters();
2274    }
2275
2276    /// Returns the current Durable Transaction VLSN (D7, JE RepNode.getDTVLSN).
2277    /// The highest VLSN replicated to a majority of electable replicas; 0 if
2278    /// none yet. Used by the election ranking so the most-durable node wins.
2279    pub fn get_dtvlsn(&self) -> u64 {
2280        self.dtvlsn.load(std::sync::atomic::Ordering::Acquire)
2281    }
2282
2283    /// Advance the DTVLSN to `candidate` if it is greater (JE
2284    /// RepNode.updateDTVLSN — an `AtomicLongMax.updateMax`). The DTVLSN can
2285    /// only move forward. Returns the resulting (possibly unchanged) value.
2286    pub fn update_dtvlsn(&self, candidate: u64) -> u64 {
2287        use std::sync::atomic::Ordering;
2288        let mut cur = self.dtvlsn.load(Ordering::Acquire);
2289        while candidate > cur {
2290            match self.dtvlsn.compare_exchange_weak(
2291                cur,
2292                candidate,
2293                Ordering::AcqRel,
2294                Ordering::Acquire,
2295            ) {
2296                Ok(_) => return candidate,
2297                Err(observed) => cur = observed,
2298            }
2299        }
2300        cur
2301    }
2302
2303    /// Set the DTVLSN from the replication stream (JE RepNode.setDTVLSN —
2304    /// used exclusively by the replica, which maintains the DTVLSN from
2305    /// commit/abort records). Still enforced as advance-only via update_max so
2306    /// an out-of-order or stale record cannot move it backward.
2307    pub fn set_dtvlsn(&self, vlsn: u64) {
2308        self.update_dtvlsn(vlsn);
2309    }
2310
2311    /// Master-side DTVLSN computation (D7, JE FeederManager.updateDTVLSN):
2312    /// across the *qualifying* (electable) feeders whose replica-txn-end VLSN
2313    /// exceeds the current DTVLSN, take the minimum; once a SIMPLE_MAJORITY
2314    /// ack-count of them exceeds the current value, advance the DTVLSN to that
2315    /// minimum (a transaction is durable once a majority hold it).
2316    fn update_dtvlsn_from_feeders(&self) {
2317        if !self.is_master() {
2318            return;
2319        }
2320        let curr = self.get_dtvlsn();
2321
2322        // SIMPLE_MAJORITY required-ack-count over the electable group,
2323        // computed the same way as await_replica_acks.
2324        let group = self.get_rep_group();
2325        let electable_peers: u32 = group
2326            .get_nodes()
2327            .iter()
2328            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2329            .count() as u32;
2330        let electable_count = electable_peers + 1; // +1 for self/master
2331        // required electable acks for SIMPLE_MAJORITY = floor(n/2) replicas
2332        // (the master self-acks; a majority is reached when this many peers
2333        // also hold the VLSN).
2334        let durable_ack_count = electable_count / 2;
2335        if durable_ack_count == 0 {
2336            // Single-node (or majority is self alone): the master's own log is
2337            // immediately durable up to its latest VLSN.
2338            self.update_dtvlsn(self.get_current_vlsn());
2339            return;
2340        }
2341
2342        let mut min = u64::MAX;
2343        let mut ack_count: u32 = 0;
2344        for feeder in self.feeders.read().iter() {
2345            // replicaAcksQualify: only electable feeders count (D6).
2346            let qualifies = group
2347                .get_node(&feeder.get_replica_name())
2348                .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2349                .unwrap_or(false);
2350            if !qualifies {
2351                continue;
2352            }
2353            let replica_vlsn = feeder.get_acked_vlsn();
2354            if replica_vlsn <= curr {
2355                continue;
2356            }
2357            if replica_vlsn < min {
2358                min = replica_vlsn;
2359            }
2360            ack_count += 1;
2361            if ack_count >= durable_ack_count {
2362                // A majority of electable replicas hold >= min: durable.
2363                self.update_dtvlsn(min);
2364                return;
2365            }
2366        }
2367        // DTVLSN unchanged.
2368    }
2369
2370    /// REP-9: count qualifying (electable) feeders whose acked high-water VLSN
2371    /// is `>= commit_vlsn`.  This is the Rust equivalent of JE
2372    /// `FeederManager.getNumCurrentAckFeeders(commitVLSN)` — the durability
2373    /// quorum is satisfied when this count reaches the required ack count.
2374    /// Only Electable replicas qualify (D6, JE
2375    /// `DurabilityQuorum.replicaAcksQualify`).
2376    fn count_ack_feeders_ge(&self, commit_vlsn: u64) -> u32 {
2377        let group = self.get_rep_group();
2378        let mut count = 0u32;
2379        for feeder in self.feeders.read().iter() {
2380            let qualifies = group
2381                .get_node(&feeder.get_replica_name())
2382                .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2383                .unwrap_or(false);
2384            // A feeder counts only if it has acked a *real* VLSN at or above
2385            // the commit VLSN.  `acked_vlsn == 0` is the NULL sentinel (no ack
2386            // yet) and must never satisfy a commit, even when `commit_vlsn`
2387            // itself is 0 (no replicated commit logged) — mirrors JE
2388            // `getReplicaTxnEndVLSN()` returning NULL_VLSN for a fresh feeder,
2389            // which is not `>=` any commit VLSN.
2390            let acked = feeder.get_acked_vlsn();
2391            if qualifies && acked > 0 && acked >= commit_vlsn {
2392                count += 1;
2393            }
2394        }
2395        count
2396    }
2397
2398    /// Set the state change listener.
2399    ///
2400    ///
2401    ///
2402    /// Sets the listener used to receive asynchronous replication node state
2403    /// change events. Note that there is one listener per replication node,
2404    /// not one per handle. Invoking this method adds to the set of listeners.
2405    ///
2406    /// Invoking this method typically results in an immediate callback to the
2407    /// application via the `on_state_change` method, so that the application
2408    /// is made aware of the existing state of the node at the time the listener
2409    /// is first established.
2410    pub fn set_state_change_listener(
2411        &self,
2412        listener: Arc<dyn StateChangeListener>,
2413    ) {
2414        // Immediately notify the listener of the current state
2415        let current_state = self.node_state.get_state();
2416        let event = StateChangeEvent::new(
2417            current_state,
2418            current_state,
2419            self.get_master_name(),
2420        );
2421        listener.on_state_change(event);
2422
2423        let mut listeners = self.listeners.write();
2424        listeners.push(listener);
2425    }
2426
2427    /// Close the replicated environment.
2428    ///
2429    ///
2430    ///
2431    /// Closes this handle and releases any resources. When closed, daemon
2432    /// threads are stopped, even if they are performing work. The node ceases
2433    /// participation in the replication group. If the node was currently the
2434    /// master, the rest of the group will hold an election.
2435    ///
2436    /// The ReplicatedEnvironment should not be closed while any other type of
2437    /// handle that refers to it is not yet closed.
2438    pub fn close(&self) -> Result<()> {
2439        if self.shutdown.swap(true, Ordering::SeqCst) {
2440            // Already closed
2441            return Ok(());
2442        }
2443
2444        let old_state = self.node_state.get_state();
2445
2446        // Transition to Shutdown state. The state machine allows this from
2447        // any non-Shutdown state.
2448        let _ = self.node_state.transition_to(NodeState::Shutdown);
2449
2450        // Notify listeners of the shutdown
2451        self.notify_listeners(old_state, NodeState::Shutdown);
2452
2453        // Clear feeders
2454        {
2455            let mut feeders = self.feeders.write();
2456            feeders.clear();
2457        }
2458
2459        // C-C2: close all registered feeder channels so FeederRunner threads
2460        // observe ChannelClosed and exit their run() loops cleanly.
2461        {
2462            let channels = self.feeder_channels.lock().unwrap();
2463            for (name, ch) in channels.iter() {
2464                if let Err(e) = ch.close() {
2465                    log::debug!(
2466                        "close: feeder channel for '{}' already closed: {}",
2467                        name,
2468                        e
2469                    );
2470                }
2471            }
2472        }
2473        // Drop all active runners and queues so their Arcs release.
2474        self.active_feeder_runners.lock().unwrap().clear();
2475        self.feeder_queues.write().unwrap().clear();
2476
2477        // Signal and join all I/O threads spawned by become_master /
2478        // become_replica / start_vlsn_persistence_daemon.  The vlsn-flush
2479        // thread does a final flush on its way out so a clean close is
2480        // recoverable.  Closes finding F11.
2481        self.io_shutdown.store(true, Ordering::SeqCst);
2482        {
2483            let mut threads = self.io_threads.lock().unwrap();
2484            for handle in threads.drain(..) {
2485                let _ = handle.join();
2486            }
2487        }
2488
2489        // Belt-and-braces: even when no daemon is running (e.g.
2490        // `ReplicatedEnvironment::new` without `open`), persist a final
2491        // snapshot if env_home is configured.
2492        if let Some(ref home) = self.config.env_home
2493            && let Err(e) =
2494                crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
2495        {
2496            log::warn!(
2497                "close: failed to persist VLSN index to {}: {}",
2498                home.display(),
2499                e
2500            );
2501        }
2502
2503        // Stop the service dispatcher (the: serviceDispatcher.shutdown()).
2504        if let Some(ref dispatcher) = self.tcp_dispatcher {
2505            dispatcher.stop();
2506            let kind = if dispatcher.is_tls() { "TLS" } else { "TCP" };
2507            log::debug!(
2508                "Node '{}' {} service dispatcher stopped",
2509                self.config.node_name.as_str(),
2510                kind,
2511            );
2512        }
2513
2514        log::info!(
2515            "Replicated environment '{}' in group '{}' closed",
2516            self.config.node_name.as_str(),
2517            self.config.group_name.as_str()
2518        );
2519
2520        Ok(())
2521    }
2522
2523    /// Close this handle and shut down the Replication Group by forcing all
2524    /// active Replicas to exit.
2525    ///
2526    ///
2527    ///
2528    /// This method must be invoked on the node that's currently the Master
2529    /// after all other outstanding handles have been closed.
2530    ///
2531    /// When push-feeder threads are active (registered via
2532    /// [`Self::register_feeder_channel`]), the master first waits up to half
2533    /// of `replica_shutdown_timeout_ms` for each FeederRunner replica to
2534    /// acknowledge all outstanding log entries (VLSN catch-up).  Replicas
2535    /// that do not catch up within the budget receive a warning; the master
2536    /// proceeds to send `SHUTDOWN_GROUP` regardless.  This closes finding M-4
2537    /// of the v3.x production-readiness review.
2538    ///
2539    /// Replicas that are not fed via a registered channel (pull-based
2540    /// `PeerFeederService` path) are sent `SHUTDOWN_GROUP` without a
2541    /// VLSN-level catch-up wait — that wait requires per-replica ack tracking
2542    /// which the pull path does not yet provide.
2543    pub fn shutdown_group(
2544        &self,
2545        replica_shutdown_timeout_ms: u64,
2546    ) -> Result<()> {
2547        if !self.is_master() {
2548            return Err(RepError::InvalidState(
2549                "shutdownGroup must be invoked on the master".to_string(),
2550            ));
2551        }
2552
2553        log::info!(
2554            "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
2555            self.config.node_name.as_str(),
2556            self.config.group_name.as_str(),
2557            replica_shutdown_timeout_ms,
2558        );
2559
2560        // M-4: Wait for active FeederRunner replicas to ack the master's
2561        // current VLSN before sending SHUTDOWN_GROUP.  We allow up to half
2562        // the overall timeout for the catch-up phase so the second half
2563        // remains for the SHUTDOWN_GROUP send loop.
2564        let catchup_budget_ms = replica_shutdown_timeout_ms / 2;
2565        if catchup_budget_ms > 0 {
2566            let master_vlsn = self.vlsn_index.get_range().last();
2567            if master_vlsn > 0 {
2568                let runners: Vec<(String, Arc<FeederRunner>)> = self
2569                    .active_feeder_runners
2570                    .lock()
2571                    .unwrap()
2572                    .iter()
2573                    .map(|(k, v)| (k.clone(), Arc::clone(v)))
2574                    .collect();
2575                if !runners.is_empty() {
2576                    let catchup_deadline = std::time::Instant::now()
2577                        + Duration::from_millis(catchup_budget_ms);
2578                    for (name, runner) in &runners {
2579                        loop {
2580                            let acked = runner.known_replica_vlsn();
2581                            if acked >= master_vlsn
2582                                || std::time::Instant::now() >= catchup_deadline
2583                            {
2584                                if acked < master_vlsn {
2585                                    log::warn!(
2586                                        "shutdown_group: replica '{}' acked \
2587                                         VLSN {} < master VLSN {}; proceeding",
2588                                        name,
2589                                        acked,
2590                                        master_vlsn,
2591                                    );
2592                                } else {
2593                                    log::info!(
2594                                        "shutdown_group: replica '{}' caught up \
2595                                         to VLSN {}",
2596                                        name,
2597                                        acked,
2598                                    );
2599                                }
2600                                break;
2601                            }
2602                            std::thread::sleep(Duration::from_millis(10));
2603                        }
2604                    }
2605                }
2606            }
2607        }
2608
2609        // Closes finding F8 of the 2026 review.
2610        //
2611        // Send SHUTDOWN_GROUP to every known peer.  The recipient calls
2612        // its own `close()` and the per-connection ADMIN handler
2613        // returns ACK_OK.  Any peer that doesn't ack within the
2614        // timeout is logged and the master proceeds.  After signalling
2615        // every peer, the master closes its own env.
2616        let deadline = std::time::Instant::now()
2617            + Duration::from_millis(replica_shutdown_timeout_ms);
2618
2619        for peer in self.group_service.get_all_nodes() {
2620            if peer.name == self.config.node_name {
2621                continue;
2622            }
2623            // Don't exceed the deadline waiting for any single peer.
2624            let now = std::time::Instant::now();
2625            if now >= deadline {
2626                log::warn!(
2627                    "shutdown_group: deadline reached; skipping remaining peers"
2628                );
2629                break;
2630            }
2631            let addr_str = format!("{}:{}", peer.host, peer.port);
2632            let addr = match addr_str.parse::<SocketAddr>() {
2633                Ok(a) => a,
2634                Err(e) => {
2635                    log::warn!(
2636                        "shutdown_group: peer '{}' has bad address {}: {}",
2637                        peer.name,
2638                        addr_str,
2639                        e
2640                    );
2641                    continue;
2642                }
2643            };
2644            match crate::group_admin::send_shutdown_group(addr) {
2645                Ok(true) => log::info!(
2646                    "shutdown_group: peer '{}' acknowledged",
2647                    peer.name
2648                ),
2649                Ok(false) => log::warn!(
2650                    "shutdown_group: peer '{}' rejected the request",
2651                    peer.name
2652                ),
2653                Err(e) => log::warn!(
2654                    "shutdown_group: peer '{}' unreachable: {}",
2655                    peer.name,
2656                    e
2657                ),
2658            }
2659        }
2660
2661        // Master closes itself last.
2662        self.close()
2663    }
2664
2665    /// Check if shutdown is in progress.
2666    pub fn is_shutdown(&self) -> bool {
2667        self.shutdown.load(Ordering::SeqCst)
2668    }
2669
2670    /// Notify all registered listeners of a state change.
2671    fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
2672        let listeners = self.listeners.read();
2673        if !listeners.is_empty() {
2674            let event = StateChangeEvent::new(
2675                old_state,
2676                new_state,
2677                self.get_master_name(),
2678            );
2679            for listener in listeners.iter() {
2680                listener.on_state_change(event.clone());
2681            }
2682        }
2683    }
2684}
2685
2686// ---------------------------------------------------------------------------
2687// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
2688// ---------------------------------------------------------------------------
2689//
2690// `noxu_db::Transaction::commit_with_durability` calls
2691// `await_replica_acks` after the local WAL fsync.  This impl:
2692//
2693//   1. Rejects calls on a non-master node with `NotMaster`.
2694//   2. Rejects calls during shutdown with `Shutdown`.
2695//   3. Computes the required ack count from `electable_count` and the
2696//      requested policy.
2697//   4. Allocates a unique commit sequence number, registers the ack
2698//      requirement on the `AckTracker`, and polls `is_satisfied` with
2699//      a small sleep until either the timeout elapses or the policy
2700//      is satisfied.
2701//   5. Cleans up the tracker entry on every exit path.
2702//
2703// Closes finding F1 of the 2026 review.
2704impl ReplicaAckCoordinator for ReplicatedEnvironment {
2705    fn await_replica_acks(
2706        &self,
2707        policy: ReplicaAckPolicyKind,
2708        timeout: Duration,
2709    ) -> std::result::Result<u32, AckWaitError> {
2710        // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
2711        // says callers may already short-circuit, but be defensive.
2712        if matches!(policy, ReplicaAckPolicyKind::None) {
2713            return Ok(0);
2714        }
2715
2716        if self.is_shutdown() {
2717            return Err(AckWaitError {
2718                kind: AckWaitErrorKind::Shutdown,
2719                needed: 0,
2720                received: 0,
2721            });
2722        }
2723
2724        if !self.is_master() {
2725            return Err(AckWaitError {
2726                kind: AckWaitErrorKind::NotMaster,
2727                needed: 0,
2728                received: 0,
2729            });
2730        }
2731
2732        // Count electable peers (excluding the master) using the
2733        // RepGroup view, which counts Arbiters and Electables
2734        // identically. Only Electable nodes are counted as data
2735        // replicas able to ack a commit.  The master itself is
2736        // *implicit*: it is not registered in `group_service` (only
2737        // peers are), so we add 1 to obtain the total electable
2738        // count expected by `ReplicaAckPolicyKind::required_acks`.
2739        let group = self.get_rep_group();
2740        let electable_peers: u32 = group
2741            .get_nodes()
2742            .iter()
2743            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2744            .count() as u32;
2745        let electable_count: u32 = electable_peers + 1; // +1 for self/master
2746
2747        let needed = policy.required_acks(electable_count);
2748        if needed == 0 {
2749            // Single-node group, or All with only the master itself.
2750            return Ok(0);
2751        }
2752
2753        // REP-9 Part 2: the commit's VLSN is the key.  The master assigns a
2754        // VLSN when it logs the TxnCommit (via the shared `wal_vlsn_counter`
2755        // bumped in `EnvironmentImpl::log_txn_commit`), immediately before
2756        // this gate runs.  The latest assigned VLSN therefore IS this
2757        // commit's VLSN (the trait contract: "implementations are responsible
2758        // for assigning the commit VLSN internally").  We wait until a quorum
2759        // of qualifying electable replicas have acked a VLSN >= the commit
2760        // VLSN — faithful to JE `FeederManager.getNumCurrentAckFeeders`, which
2761        // counts feeders whose `getReplicaTxnEndVLSN() >= commitVLSN` (a
2762        // high-water `>=` test, NOT an exact-VLSN match).
2763        //
2764        // ponytail: reads the global high-water VLSN, so a concurrent later
2765        // commit can make this gate wait on a slightly higher VLSN than its
2766        // own. That is strictly SAFE (waiting for >= a newer VLSN never
2767        // returns early) and only marginally less precise; thread the
2768        // per-txn VLSN through the trait if exact per-commit granularity is
2769        // ever needed.
2770        let commit_vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
2771
2772        // Register on the AckTracker too: this is what `record_ack` notifies,
2773        // so the condvar wakes us as acks land.  The satisfaction decision
2774        // itself is the high-water feeder count below.
2775        self.ack_tracker.register(commit_vlsn, needed);
2776
2777        // Block on the ack condvar until a quorum of electable feeders hold
2778        // the commit VLSN, the timeout elapses, or shutdown is signalled — no
2779        // spin-poll (JE FeederTxns.TxnInfo uses a per-transaction
2780        // CountDownLatch.await; the AckTracker condvar is the shared-mutex
2781        // equivalent). record_ack notifies us as acks arrive.
2782        let satisfied = self.ack_tracker.wait_for_predicate(
2783            timeout,
2784            || self.count_ack_feeders_ge(commit_vlsn) >= needed,
2785            || self.is_shutdown(),
2786        );
2787        if satisfied {
2788            self.ack_tracker.cleanup_through(commit_vlsn);
2789            return Ok(needed);
2790        }
2791        if self.is_shutdown() {
2792            self.ack_tracker.cleanup_through(commit_vlsn);
2793            return Err(AckWaitError {
2794                kind: AckWaitErrorKind::Shutdown,
2795                needed,
2796                received: 0,
2797            });
2798        }
2799        // Timed out: report the partial ack count (qualifying electable
2800        // feeders holding the commit VLSN) so the caller can surface
2801        // InsufficientReplicas.
2802        let received = self.count_ack_feeders_ge(commit_vlsn);
2803        self.ack_tracker.cleanup_through(commit_vlsn);
2804        Err(AckWaitError { kind: AckWaitErrorKind::Timeout, needed, received })
2805    }
2806
2807    /// X-3: allocate the next VLSN for a recovered XA commit and register
2808    /// `lsn` in the VLSN index so feeders can stream the commit.
2809    ///
2810    /// Increments off the current latest VLSN so the new VLSN is strictly
2811    /// monotonically increasing.  In a single-node or master-less environment
2812    /// (not master) returns 0 (NULL_VLSN — harmless, the default).
2813    fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
2814        // Only allocate a VLSN when we are the master; on a replica the
2815        // recovered XA should have been replicated by the original master.
2816        if !self.is_master() {
2817            return 0;
2818        }
2819        let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
2820        // A recovered XA commit is a commit log entry; dispatch as TxnCommit
2821        // so lastTxnEnd/lastSync advance (REP-5).
2822        self.vlsn_index.register_with_type(
2823            next_vlsn,
2824            lsn.file_number(),
2825            lsn.file_offset(),
2826            noxu_log::LogEntryType::TxnCommit,
2827        );
2828        log::debug!(
2829            "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
2830            next_vlsn,
2831            lsn
2832        );
2833        next_vlsn
2834    }
2835
2836    /// R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.
2837    ///
2838    /// The caller writes the `TxnCommit` WAL entry with this VLSN embedded,
2839    /// then calls `register_recovered_commit_vlsn` with the actual commit LSN.
2840    /// This two-step approach ensures the WAL entry carries the VLSN so the
2841    /// X-14 VLSN rebuild on second crash can find it.
2842    fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
2843        if !self.is_master() {
2844            return 0;
2845        }
2846        // Peek at the next VLSN without registering.  The actual registration
2847        // happens in register_recovered_commit_vlsn() after the WAL write.
2848        self.vlsn_index.get_latest_vlsn() + 1
2849    }
2850
2851    /// R-3: register a pre-allocated VLSN in the VLSN index with the actual
2852    /// commit LSN.  Called after writing the `TxnCommit` WAL entry.
2853    fn register_recovered_commit_vlsn(
2854        &self,
2855        vlsn: u64,
2856        commit_lsn: noxu_util::Lsn,
2857    ) {
2858        if vlsn == 0 || !self.is_master() {
2859            return;
2860        }
2861        // The pre-allocated VLSN is for a TxnCommit WAL entry; dispatch the
2862        // type so lastTxnEnd/lastSync advance (REP-5).
2863        self.vlsn_index.register_with_type(
2864            vlsn,
2865            commit_lsn.file_number(),
2866            commit_lsn.file_offset(),
2867            noxu_log::LogEntryType::TxnCommit,
2868        );
2869        log::debug!(
2870            "register_recovered_commit_vlsn: registered vlsn={} for commit_lsn={:?}",
2871            vlsn,
2872            commit_lsn
2873        );
2874    }
2875}
2876
2877#[cfg(test)]
2878mod tests {
2879    use super::*;
2880    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
2881
2882    /// Helper to create a test config with a fixed port (unit-test style,
2883    /// no real TCP bind needed — hostname "localhost" resolves but the port
2884    /// might be in use; use `test_config_port0` for real TCP tests).
2885    fn test_config(node_name: &str) -> RepConfig {
2886        RepConfig::builder("test_group", node_name, "localhost")
2887            .node_port(5001)
2888            .build()
2889    }
2890
2891    /// Helper to create a test config that binds to an OS-assigned port.
2892    fn test_config_port0(node_name: &str) -> RepConfig {
2893        RepConfig::builder("test_group", node_name, "127.0.0.1")
2894            .node_port(0)
2895            .build()
2896    }
2897
2898    #[test]
2899    fn test_initial_state_is_detached() {
2900        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2901        // NodeStateMachine starts in Detached state
2902        assert_eq!(env.get_state(), NodeState::Detached);
2903        assert!(!env.is_master());
2904        assert!(!env.is_replica());
2905        assert!(!env.is_active());
2906    }
2907
2908    #[test]
2909    fn test_become_master() {
2910        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2911        env.become_master(1).unwrap();
2912        assert_eq!(env.get_state(), NodeState::Master);
2913        assert!(env.is_master());
2914        assert!(!env.is_replica());
2915        assert!(env.is_active());
2916    }
2917
2918    #[test]
2919    fn test_become_replica() {
2920        let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
2921        env.become_replica("node1").unwrap();
2922        assert_eq!(env.get_state(), NodeState::Replica);
2923        assert!(!env.is_master());
2924        assert!(env.is_replica());
2925        assert!(env.is_active());
2926    }
2927
2928    #[test]
2929    fn test_get_node_name() {
2930        let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
2931        assert_eq!(env.get_node_name(), "my_node");
2932    }
2933
2934    #[test]
2935    fn test_get_group_name() {
2936        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2937        assert_eq!(env.get_group_name(), "test_group");
2938    }
2939
2940    #[test]
2941    fn test_register_vlsn_updates_index() {
2942        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2943        env.register_vlsn(1, 0, 100);
2944        env.register_vlsn(2, 0, 200);
2945        env.register_vlsn(3, 0, 300);
2946
2947        assert_eq!(env.get_current_vlsn(), 3);
2948        let range = env.get_vlsn_range();
2949        assert_eq!(range.first(), 1);
2950        assert_eq!(range.last(), 3);
2951    }
2952
2953    #[test]
2954    fn test_record_ack() {
2955        use crate::node_type::NodeType;
2956        use crate::rep_node::RepNode;
2957        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
2958        env.become_master(1).unwrap();
2959        // replicaAcksQualify: only ELECTABLE replicas count toward durability,
2960        // so the replica must be a known electable member of the group.
2961        env.add_peer(RepNode::new(
2962            "replica1".to_string(),
2963            NodeType::Electable,
2964            "127.0.0.1".to_string(),
2965            6001,
2966            2,
2967        ))
2968        .unwrap();
2969
2970        env.register_vlsn(1, 0, 100);
2971        // Register a pending ack requirement, then record ack
2972        env.get_ack_tracker().register(1, 1);
2973        env.record_ack(1, "replica1");
2974        // Ack should be satisfied
2975        assert!(env.get_ack_tracker().is_satisfied(1));
2976    }
2977
2978    #[test]
2979    fn test_record_ack_from_non_electable_does_not_qualify() {
2980        use crate::node_type::NodeType;
2981        use crate::rep_node::RepNode;
2982        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
2983        env.become_master(1).unwrap();
2984        // A Monitor is NOT electable -> its ack must not count (JE
2985        // DurabilityQuorum.replicaAcksQualify).
2986        env.add_peer(RepNode::new(
2987            "monitor1".to_string(),
2988            NodeType::Monitor,
2989            "127.0.0.1".to_string(),
2990            6002,
2991            3,
2992        ))
2993        .unwrap();
2994        env.register_vlsn(1, 0, 100);
2995        env.get_ack_tracker().register(1, 1);
2996        env.record_ack(1, "monitor1");
2997        assert!(
2998            !env.get_ack_tracker().is_satisfied(1),
2999            "non-electable ack must not satisfy durability quorum"
3000        );
3001        // An unknown replica likewise does not qualify.
3002        env.record_ack(1, "ghost");
3003        assert!(!env.get_ack_tracker().is_satisfied(1));
3004    }
3005
3006    #[test]
3007    fn test_authoritative_quorum_met() {
3008        // 1-node group (electable_total=1): master alone IS authoritative
3009        // (quorum_size = 1/2+1 = 1; 0 replicas + 1 >= 1).
3010        assert!(ReplicatedEnvironment::authoritative_quorum_met(0, 1));
3011        // 3-node group (electable_total=3, quorum_size=2): master with 0
3012        // connected replicas is the minority -> NOT authoritative.
3013        assert!(!ReplicatedEnvironment::authoritative_quorum_met(0, 3));
3014        // 3-node group with 1 connected electable replica -> 1+1=2 >= 2 -> yes.
3015        assert!(ReplicatedEnvironment::authoritative_quorum_met(1, 3));
3016        // 5-node group (quorum_size=3): need 2 connected replicas.
3017        assert!(!ReplicatedEnvironment::authoritative_quorum_met(1, 5));
3018        assert!(ReplicatedEnvironment::authoritative_quorum_met(2, 5));
3019    }
3020
3021    #[test]
3022    fn test_is_authoritative_master_requires_master_role() {
3023        // A non-master is never authoritative regardless of connections.
3024        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3025        assert!(!env.is_master());
3026        assert!(!env.is_authoritative_master());
3027        // A single-node master (no peers) IS authoritative.
3028        env.become_master(1).unwrap();
3029        assert!(env.is_authoritative_master());
3030    }
3031
3032    #[test]
3033    fn test_dtvlsn_update_max_advances_only() {
3034        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3035        assert_eq!(env.get_dtvlsn(), 0);
3036        assert_eq!(env.update_dtvlsn(10), 10);
3037        assert_eq!(env.get_dtvlsn(), 10);
3038        // A lower candidate must not move it backward.
3039        assert_eq!(env.update_dtvlsn(5), 10);
3040        assert_eq!(env.get_dtvlsn(), 10);
3041        // Equal is a no-op.
3042        assert_eq!(env.update_dtvlsn(10), 10);
3043        // set_dtvlsn (replica path) is also advance-only.
3044        env.set_dtvlsn(7);
3045        assert_eq!(env.get_dtvlsn(), 10);
3046        env.set_dtvlsn(20);
3047        assert_eq!(env.get_dtvlsn(), 20);
3048    }
3049
3050    #[test]
3051    fn test_dtvlsn_majority_min_across_feeders() {
3052        use crate::node_type::NodeType;
3053        use crate::rep_node::RepNode;
3054        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3055        env.become_master(1).unwrap();
3056        // Three electable replicas → electable_count = 4 (incl. master) →
3057        // durable_ack_count = 2. With master self-ack, DTVLSN advances to the
3058        // min of the 2 highest qualifying feeders that exceed the current
3059        // DTVLSN.
3060        for (i, name) in ["r1", "r2", "r3"].iter().enumerate() {
3061            env.add_peer(RepNode::new(
3062                name.to_string(),
3063                NodeType::Electable,
3064                "127.0.0.1".to_string(),
3065                6100 + i as u16,
3066                (i + 2) as u32,
3067            ))
3068            .unwrap();
3069        }
3070        // Register feeders with differing acked VLSNs: r1=100, r2=80, r3=50.
3071        for (name, vlsn) in [("r1", 100u64), ("r2", 80), ("r3", 50)] {
3072            let f = crate::stream::feeder::Feeder::new(name.to_string());
3073            f.record_ack(vlsn);
3074            env.feeders.write().push(f);
3075        }
3076        env.update_dtvlsn_from_feeders();
3077        // First two qualifying feeders encountered are r1(100), r2(80);
3078        // min(100,80)=80 and that is a majority (2 of 4) → DTVLSN = 80.
3079        // (r3=50 < 80 is not required for durability.)
3080        assert!(
3081            env.get_dtvlsn() >= 80,
3082            "DTVLSN must reach the majority-min (>=80), got {}",
3083            env.get_dtvlsn()
3084        );
3085    }
3086
3087    #[test]
3088    fn test_close_sets_shutdown() {
3089        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3090        assert!(!env.is_shutdown());
3091
3092        env.close().unwrap();
3093        assert!(env.is_shutdown());
3094        // After close, state should be Shutdown
3095        assert_eq!(env.get_state(), NodeState::Shutdown);
3096    }
3097
3098    #[test]
3099    fn test_close_is_idempotent() {
3100        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3101        env.close().unwrap();
3102        env.close().unwrap(); // Should not error
3103        assert!(env.is_shutdown());
3104    }
3105
3106    #[test]
3107    fn test_cannot_become_master_when_shutdown() {
3108        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3109        env.close().unwrap();
3110
3111        let result = env.become_master(1);
3112        assert!(result.is_err());
3113    }
3114
3115    #[test]
3116    fn test_cannot_become_replica_when_shutdown() {
3117        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3118        env.close().unwrap();
3119
3120        let result = env.become_replica("master");
3121        assert!(result.is_err());
3122    }
3123
3124    #[test]
3125    fn test_cannot_apply_entry_when_shutdown() {
3126        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3127        env.close().unwrap();
3128
3129        let result = env.apply_entry(1, 0, vec![1, 2, 3]);
3130        assert!(result.is_err());
3131    }
3132
3133    #[test]
3134    fn test_cannot_transfer_master_when_not_master() {
3135        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3136        env.become_replica("other").unwrap();
3137
3138        let config = MasterTransferConfig::new(
3139            "target_node".to_string(),
3140            Duration::from_secs(30),
3141        );
3142        let result = env.transfer_master(config);
3143        assert!(result.is_err());
3144    }
3145
3146    #[test]
3147    fn test_transfer_master_requires_registered_target() {
3148        // F7: transfer_master is no longer a no-op; it sends an ADMIN
3149        // TRANSFER_MASTER signal to the target via TCP.  An unregistered
3150        // target is rejected at the address-resolution step.
3151        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3152        env.become_master(1).unwrap();
3153
3154        let config = MasterTransferConfig::new(
3155            "unknown_target".to_string(),
3156            Duration::from_secs(30),
3157        );
3158        let result = env.transfer_master(config);
3159        assert!(
3160            result.is_err(),
3161            "transfer_master to unregistered target must error"
3162        );
3163    }
3164
3165    #[test]
3166    fn test_apply_entry_registers_vlsn() {
3167        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3168        env.become_replica("master").unwrap();
3169
3170        env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
3171        env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
3172
3173        assert_eq!(env.get_current_vlsn(), 2);
3174    }
3175
3176    #[test]
3177    fn test_master_name_tracking() {
3178        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3179
3180        // Initially no master known
3181        assert!(env.get_master_name().is_none());
3182
3183        // After becoming master, this node is the master
3184        env.become_master(1).unwrap();
3185        assert_eq!(env.get_master_name(), Some("node1".to_string()));
3186    }
3187
3188    #[test]
3189    fn test_master_to_replica_transition() {
3190        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3191
3192        // Become master first
3193        env.become_master(1).unwrap();
3194        assert_eq!(env.get_master_name(), Some("node1".to_string()));
3195
3196        // Transition to replica (Master -> Replica is valid)
3197        env.become_replica("other_master").unwrap();
3198        assert_eq!(env.get_master_name(), Some("other_master".to_string()));
3199        assert!(env.is_replica());
3200    }
3201
3202    #[test]
3203    fn test_state_change_listener_notification() {
3204        struct TestListener {
3205            call_count: AtomicU32,
3206            last_new_state: noxu_sync::Mutex<Option<NodeState>>,
3207        }
3208
3209        impl StateChangeListener for TestListener {
3210            fn on_state_change(&self, event: StateChangeEvent) {
3211                self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
3212                *self.last_new_state.lock() = Some(event.new_state);
3213            }
3214        }
3215
3216        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3217        let listener = Arc::new(TestListener {
3218            call_count: AtomicU32::new(0),
3219            last_new_state: noxu_sync::Mutex::new(None),
3220        });
3221
3222        // Setting the listener should trigger an immediate notification
3223        env.set_state_change_listener(listener.clone());
3224        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
3225
3226        // State change should trigger another notification
3227        env.become_master(1).unwrap();
3228        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
3229        assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
3230    }
3231
3232    #[test]
3233    fn test_close_notifies_listeners() {
3234        struct ShutdownListener {
3235            shutdown_seen: AtomicBool,
3236        }
3237
3238        impl StateChangeListener for ShutdownListener {
3239            fn on_state_change(&self, event: StateChangeEvent) {
3240                if event.new_state == NodeState::Shutdown {
3241                    self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
3242                }
3243            }
3244        }
3245
3246        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3247        let listener = Arc::new(ShutdownListener {
3248            shutdown_seen: AtomicBool::new(false),
3249        });
3250
3251        // The initial notification is for the current (Detached) state
3252        env.set_state_change_listener(listener.clone());
3253
3254        // Become master first so the close transition is meaningful
3255        env.become_master(1).unwrap();
3256        assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3257
3258        env.close().unwrap();
3259        assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3260    }
3261
3262    #[test]
3263    fn test_shutdown_group_requires_master() {
3264        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3265        env.become_replica("other").unwrap();
3266
3267        let result = env.shutdown_group(5000);
3268        assert!(result.is_err());
3269    }
3270
3271    #[test]
3272    fn test_shutdown_group_as_master() {
3273        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3274        env.become_master(1).unwrap();
3275
3276        let result = env.shutdown_group(5000);
3277        assert!(result.is_ok());
3278        assert!(env.is_shutdown());
3279    }
3280
3281    #[test]
3282    fn test_get_config() {
3283        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3284        assert_eq!(env.get_config().node_name, "node1");
3285        assert_eq!(env.get_config().group_name, "test_group");
3286    }
3287
3288    #[test]
3289    fn test_get_stats() {
3290        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3291        let _stats = env.get_stats();
3292        // Just verify we can access stats without panicking
3293    }
3294
3295    // -----------------------------------------------------------------------
3296    // TCP dispatcher tests (H-5 / H-7)
3297    // -----------------------------------------------------------------------
3298
3299    #[test]
3300    fn test_tcp_dispatcher_starts_on_new() {
3301        // Use port 0 so the OS assigns an ephemeral port.
3302        let env =
3303            ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
3304        // The dispatcher must have started and bound a real port.
3305        let addr = env.bound_addr();
3306        assert!(addr.is_some(), "expected a bound address");
3307        let addr = addr.unwrap();
3308        assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
3309    }
3310
3311    #[test]
3312    fn test_tcp_dispatcher_stops_on_close() {
3313        let env =
3314            ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
3315        // Dispatcher is running.
3316        assert!(
3317            env.tcp_dispatcher
3318                .as_ref()
3319                .map(|d| d.is_running())
3320                .unwrap_or(false)
3321        );
3322
3323        env.close().unwrap();
3324
3325        // After close, dispatcher must be stopped.
3326        assert!(
3327            !env.tcp_dispatcher
3328                .as_ref()
3329                .map(|d| d.is_running())
3330                .unwrap_or(false),
3331            "dispatcher should be stopped after close"
3332        );
3333    }
3334
3335    #[test]
3336    fn test_tcp_dispatcher_accepts_connection() {
3337        use crate::net::Channel;
3338        use crate::net::ServiceHandler;
3339        use crate::net::service_dispatcher::connect_to_service;
3340        use std::sync::atomic::{AtomicU32, Ordering as AO};
3341        use std::time::Duration;
3342
3343        struct PingHandler {
3344            count: AtomicU32,
3345        }
3346        impl ServiceHandler for PingHandler {
3347            fn service_name(&self) -> &str {
3348                "ping"
3349            }
3350            fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
3351                self.count.fetch_add(1, AO::SeqCst);
3352                // Echo the first message back.
3353                if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
3354                    let _ = ch.send(&msg);
3355                }
3356                Ok(())
3357            }
3358        }
3359
3360        let env =
3361            ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
3362        let addr = env.bound_addr().expect("dispatcher must be bound");
3363
3364        // Register a ping handler on the running dispatcher.
3365        if let Some(ref disp) = env.tcp_dispatcher {
3366            let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
3367            disp.register("ping", handler.clone());
3368
3369            // Give the accept thread a moment.
3370            std::thread::sleep(Duration::from_millis(20));
3371
3372            let client = connect_to_service(addr, "ping").unwrap();
3373            client.send(b"hello").unwrap();
3374            let reply = client.receive(Duration::from_secs(2)).unwrap();
3375            assert_eq!(reply, Some(b"hello".to_vec()));
3376
3377            assert_eq!(handler.count.load(AO::SeqCst), 1);
3378        }
3379
3380        env.close().unwrap();
3381    }
3382
3383    #[test]
3384    fn test_become_master_auto_transitions_from_detached() {
3385        // The state machine requires Detached -> Unknown -> Master.
3386        // become_master() should handle this automatically.
3387        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3388        assert_eq!(env.get_state(), NodeState::Detached);
3389        env.become_master(1).unwrap();
3390        assert_eq!(env.get_state(), NodeState::Master);
3391    }
3392
3393    #[test]
3394    fn test_become_replica_auto_transitions_from_detached() {
3395        // The state machine requires Detached -> Unknown -> Replica.
3396        // become_replica() should handle this automatically.
3397        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3398        assert_eq!(env.get_state(), NodeState::Detached);
3399        env.become_replica("master_node").unwrap();
3400        assert_eq!(env.get_state(), NodeState::Replica);
3401    }
3402
3403    #[test]
3404    fn test_cannot_transfer_master_when_shutdown() {
3405        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3406        env.become_master(1).unwrap();
3407        env.close().unwrap();
3408
3409        let config = MasterTransferConfig::new(
3410            "target".to_string(),
3411            Duration::from_secs(30),
3412        );
3413        let result = env.transfer_master(config);
3414        assert!(result.is_err());
3415    }
3416
3417    #[test]
3418    fn test_full_lifecycle() {
3419        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3420
3421        // Start as detached
3422        assert_eq!(env.get_state(), NodeState::Detached);
3423
3424        // Become master
3425        env.become_master(1).unwrap();
3426        assert!(env.is_master());
3427
3428        // Register some VLSNs
3429        env.register_vlsn(1, 0, 100);
3430        env.register_vlsn(2, 0, 200);
3431
3432        // Record ack from replica
3433        env.record_ack(1, "replica1");
3434        env.record_ack(2, "replica1");
3435
3436        // Transition to replica (simulating failover)
3437        env.become_replica("node2").unwrap();
3438        assert!(env.is_replica());
3439
3440        // Apply entries from new master
3441        env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
3442
3443        // Close
3444        env.close().unwrap();
3445        assert!(env.is_shutdown());
3446    }
3447
3448    /// Verify that `with_environment` lazily registers the RESTORE service on
3449    /// the TCP dispatcher when `config.env_home` was not set at construction.
3450    ///
3451    /// This mirrors`RepNode.envSetup()` which registers the restore handler
3452    /// when the environment is wired into the replicated node.
3453    #[test]
3454    fn test_restore_registered_lazily_via_with_environment() {
3455        use noxu_dbi::EnvironmentImpl;
3456        use tempfile::TempDir;
3457
3458        let dir = TempDir::new().expect("temp dir");
3459
3460        // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
3461        let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
3462            .node_port(0)
3463            .build();
3464
3465        let rep_env = ReplicatedEnvironment::new(config).unwrap();
3466
3467        // Not yet registered.
3468        assert!(
3469            !rep_env
3470                .restore_registered
3471                .load(std::sync::atomic::Ordering::SeqCst)
3472        );
3473
3474        // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
3475        let env_impl = Arc::new(
3476            EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
3477        );
3478        rep_env.with_environment(env_impl);
3479
3480        // Now the RESTORE service must be registered.
3481        assert!(
3482            rep_env
3483                .restore_registered
3484                .load(std::sync::atomic::Ordering::SeqCst)
3485        );
3486    }
3487
3488    /// Verify that when `config.env_home` IS set at construction, the RESTORE
3489    /// service is registered immediately (not deferred).
3490    #[test]
3491    fn test_restore_registered_eagerly_when_env_home_in_config() {
3492        use tempfile::TempDir;
3493
3494        let dir = TempDir::new().expect("temp dir");
3495
3496        let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
3497            .node_port(0)
3498            .env_home(dir.path())
3499            .build();
3500
3501        let rep_env = ReplicatedEnvironment::new(config).unwrap();
3502
3503        // Should be registered immediately (env_home was in config).
3504        assert!(
3505            rep_env
3506                .restore_registered
3507                .load(std::sync::atomic::Ordering::SeqCst)
3508        );
3509    }
3510}