Skip to main content

noxu_rep/
replicated_environment.rs

1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29    AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30    ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43    ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::{
50    AnyServiceDispatcher, TcpServiceDispatcher,
51};
52use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
53use crate::network_restore_server::{
54    NetworkRestoreServer, RESTORE_SERVICE_NAME,
55};
56use crate::node_state::{NodeState, NodeStateMachine};
57use crate::rep_config::RepConfig;
58use crate::rep_stats::RepStats;
59use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
60use crate::stream::feeder::EnvironmentLogScanner;
61use crate::stream::feeder::Feeder;
62use crate::stream::feeder::FeederRunner;
63use crate::stream::peer_feeder::PeerScannerAdapter;
64use crate::stream::peer_feeder::{
65    PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
66};
67use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
68use crate::stream::syncup::{
69    Matchpoint, RollbackDecision, find_matchpoint, verify_rollback,
70};
71use crate::stream::syncup_reader::VlsnIndexView;
72use crate::vlsn::vlsn_index::VlsnIndex;
73use crate::vlsn::vlsn_range::VlsnRange;
74use std::collections::HashMap;
75
76/// Default heartbeat timeout for master liveness detection.
77const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
78
79/// A replicated database environment.
80///
81///
82///
83/// This is the entry point for replication. It wraps a standard Environment
84/// and adds replication capabilities including master election, replica
85/// streaming, and commit acknowledgments.
86///
87/// High Availability (HA) provides a replicated, embedded database
88/// management system which provides fast, reliable, and scalable data
89/// management. HA enables replication of an environment across a Replication
90/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
91///
92/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
93/// operations are executed in the same fashion in both replicated and
94/// non-replicated applications. A `ReplicatedEnvironment` must be
95/// transactional. All replicated databases created in the replicated
96/// environment must be transactional as well.
97///
98/// A `ReplicatedEnvironment` joins its replication group when it is created.
99/// When `new()` returns, the node will have established contact with the other
100/// members of the group and will be ready to service operations.
101///
102/// Replicated environments can be created with node type Electable or
103/// Secondary. Electable nodes can be masters or replicas, and participate in
104/// both master elections and commit durability decisions. Secondary nodes can
105/// only be replicas, not masters, and do not participate in either elections or
106/// durability decisions.
107///
108/// # Example
109///
110/// ```ignore
111/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
112///
113/// let config = RepConfig::builder("my_group", "node1", "localhost")
114///     .node_port(5001)
115///     .build();
116/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
117/// ```
118/// Outcome of [`ReplicatedEnvironment::syncup_with_feeder`] — the action taken
119/// by a live diverged-tail syncup. Port of the branch in JE
120/// `ReplicaFeederSyncup.execute` between a soft rollback and a network restore.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum SyncupAction {
123    /// The divergent tail was rolled back to the matchpoint; resume streaming
124    /// from `start_vlsn` (`matchpoint + 1`). `matchpoint_vlsn == last VLSN`
125    /// means the replica was not diverged and nothing was truncated.
126    RolledBack { matchpoint_vlsn: u64, start_vlsn: u64 },
127    /// No safe rollback (no common matchpoint, or it would cross a committed
128    /// txn); the replica must do a full network restore.
129    NeedsRestore,
130}
131
132pub struct ReplicatedEnvironment {
133    /// The replication configuration for this node.
134    config: RepConfig,
135    /// Tracks the current node state (Detached, Unknown, Master, Replica).
136    node_state: NodeStateMachine,
137    /// Service for managing the replication group membership.
138    group_service: GroupService,
139    /// Maps VLSNs to log file positions.
140    ///
141    /// Wrapped in `Arc` so that background daemons (election driver,
142    /// VLSN-index persistence flusher) can share access without
143    /// borrowing the env.  Closes finding F11 (
144    /// the 2026 review).
145    vlsn_index: Arc<VlsnIndex>,
146    /// Tracks acknowledgments from replicas (used by master).
147    ack_tracker: AckTracker,
148    /// Replication statistics.
149    stats: RepStats,
150    /// Active feeder threads (master -> replica streams).
151    feeders: RwLock<Vec<Feeder>>,
152    /// Replica stream for receiving updates from the master.
153    replica_stream: ReplicaStream,
154    /// Tracks the current master node.
155    master_tracker: MasterTracker,
156    /// State change listeners.
157    listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
158    /// Shutdown flag.
159    shutdown: AtomicBool,
160    /// Service dispatcher — listens on the replication port and routes
161    /// incoming connections to the appropriate service handler (feeder, etc.).
162    ///
163    /// `Plain`: plain TCP (default / Phase-2 behaviour).
164    /// `Tls`: TLS + mTLS enforcement (Phase 3, when `RepConfig::tls_config` is set
165    /// and `transport_kind` is `Tls`).
166    ///
167    /// `None` only when the bind address cannot be resolved.
168    tcp_dispatcher: Option<AnyServiceDispatcher>,
169    /// The address the `tcp_dispatcher` is actually bound to (may differ from
170    /// the configured port when port 0 is used in tests).
171    bound_addr: Option<SocketAddr>,
172
173    /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
174    ///
175    /// When set, `become_master` spawns a `FeederRunner` per replica using
176    /// `EnvironmentLogScanner`, and `become_replica` spawns a
177    /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
178    ///
179    /// In HA.
180    env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
181
182    /// Background I/O thread handles spawned during state transitions.
183    ///
184    /// Stored so that `close()` can join them cleanly.  Each handle is
185    /// `Option` so we can `take()` it in `close()`.
186    io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
187
188    /// Shutdown flag shared with I/O threads so they terminate when the
189    /// environment is closed.
190    io_shutdown: AtomicBool,
191
192    /// Whether the RESTORE service has been registered on the TCP dispatcher.
193    ///
194    /// When `config.env_home` is `None` at construction time, registration is
195    /// deferred until `with_environment()` provides the env home path.
196    restore_registered: AtomicBool,
197
198    /// In-memory log queue used by the peer feeder service.
199    ///
200    /// When this node is a replica, `apply_entry()` pushes each received log
201    /// entry here.  The `PeerFeederService` registered on the TCP dispatcher
202    /// reads from this queue to stream entries to downstream replicas that
203    /// are behind this node (peer-to-peer log distribution, HA style).
204    peer_scanner: Arc<PeerLogScanner>,
205
206    /// Durable Transaction VLSN (D7, JE RepNode.dtvlsn): the highest VLSN
207    /// known to have been replicated to a *majority* of the electable
208    /// replicas. On a master it is computed from feeder ack/heartbeat progress
209    /// (`update_dtvlsn_from_feeders`); on a replica it is set from commit/abort
210    /// records in the stream (`set_dtvlsn`). It advances monotonically (an
211    /// `update_max`). 0 = NULL_VLSN. Used by the election ranking (D2) so the
212    /// most-durable node, not merely the highest-raw-VLSN node, wins.
213    dtvlsn: std::sync::atomic::AtomicU64,
214
215    /// Shared acceptor state used by the ELECTION service handler.
216    /// The election driver updates `own_vlsn` / `own_term` here as the
217    /// node progresses; incoming acceptor sessions read it on every
218    /// connection so their replies always reflect the local node's
219    /// most recent state.  Closes finding F6.
220    election_state: Arc<ElectionAcceptorState>,
221
222    /// Self-referential `Weak` populated once the env has been wrapped
223    /// in an `Arc`.  Used by the replica I/O thread spawned in
224    /// `become_replica` so it can call `bootstrap_via_dispatcher` when
225    /// the master signals `NeedsRestore`.
226    ///
227    /// Populated lazily via [`Self::init_self_weak`] from `open()` and
228    /// the test harness.  When unset (callers that build the env via
229    /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
230    /// the I/O thread falls back to operator-driven bootstrap.
231    self_weak: OnceLock<Weak<Self>>,
232
233    // -----------------------------------------------------------------------
234    // C-C2: active push-feeder infrastructure
235    // -----------------------------------------------------------------------
236    /// Per-replica channels injected via [`Self::register_feeder_channel`].
237    ///
238    /// When [`Self::become_master`] is called (or when the node is already
239    /// master), a [`FeederRunner`] thread is spawned for each registered
240    /// channel, actively streaming entries to that replica over the channel.
241    ///
242    /// Using `register_feeder_channel` is the primary integration point for
243    /// the push-based feeder path.  Production deployments wire in a
244    /// `TcpChannel`; test code uses `LocalChannelPair`.
245    feeder_channels: StdMutex<HashMap<String, Arc<dyn crate::net::Channel>>>,
246
247    /// Per-replica dedicated entry queues backing the push-feeder path.
248    ///
249    /// Each `FeederRunner` thread reads exclusively from its replica's queue.
250    /// [`Self::replicate_entry`] and [`Self::apply_entry`] fan out into all
251    /// registered queues so the push runners receive entries without competing
252    /// with [`PeerFeederService`] for ownership of `peer_scanner`.
253    feeder_queues: std::sync::RwLock<HashMap<String, Arc<PeerLogScanner>>>,
254
255    /// Active `FeederRunner` references for acked-VLSN queries and
256    /// clean shutdown (M-4: wait for replicas to catch up).
257    active_feeder_runners: StdMutex<HashMap<String, Arc<FeederRunner>>>,
258
259    /// Monotone VLSN counter shared with the wired `EnvironmentImpl`.
260    ///
261    /// Installed into the environment via
262    /// `EnvironmentImpl::set_replication_vlsn_counter()` when
263    /// `with_environment` is called.  Each `log_txn_commit` on the master
264    /// atomically increments this counter and writes a VLSN-tagged WAL entry,
265    /// which `EnvironmentLogScanner` then picks up without any
266    /// `replicate_entry` call from the application.
267    wal_vlsn_counter: Arc<std::sync::atomic::AtomicU64>,
268
269    /// REP-10 (C): the replica-side consistency tracker, built from the
270    /// REP-7 `last_applied_vlsn` handle when the replica replay thread starts
271    /// (`become_replica`).  `None` on a master or before replay is wired.
272    ///
273    /// A read that begins on a replica with a non-`NoConsistency` policy waits
274    /// on this tracker (`begin_read_consistency`).  Port of
275    /// `RepImpl.getConsistency` / `Replica.getConsistencyTracker`.
276    consistency_tracker: StdMutex<Option<crate::ConsistencyTracker>>,
277}
278
279impl ReplicatedEnvironment {
280    /// Create a new replicated environment.
281    ///
282    ///
283    ///
284    /// Creates a replicated environment handle and starts participating in the
285    /// replication group. The node's state is determined when it joins the
286    /// group, and mastership is not preconfigured. If the group has no current
287    /// master, creation will trigger an election to determine whether this node
288    /// will participate as a Master or a Replica.
289    ///
290    /// A brand new node will always join an existing group as a Replica, unless
291    /// it is the very first electable node that is creating the group. In that
292    /// case it joins as the Master of the newly formed singleton group.
293    pub fn new(config: RepConfig) -> Result<Self> {
294        // mTLS Phase 2 (v3.1.0): peer_allowlist enforcement is real at the
295        // TLS channel layer (TlsTcpChannelListener::bind_with_tls_and_allowlist).
296        // Phase 3 (this release): when RepConfig::tls_config is set AND
297        // transport_kind is Tls, the service dispatcher itself enforces mTLS
298        // via TlsTcpServiceDispatcher.  For the remaining cases (no TlsConfig
299        // or non-TLS transport) keep the Phase-2 accurate warn.
300        if !config.peer_allowlist.is_empty() {
301            match config.transport_kind {
302                crate::rep_config::RepTransportKind::Tls => {
303                    if config.tls_config.is_some() {
304                        log::info!(
305                            "[{}] peer_allowlist ({} entries) + tls_config set; \
306                             mTLS will be enforced on the service dispatcher.",
307                            config.node_name,
308                            config.peer_allowlist.len(),
309                        );
310                    } else {
311                        log::info!(
312                            "[{}] peer_allowlist configured ({} entries) but \
313                             tls_config is None — the service dispatcher will \
314                             use plain TCP. Set RepConfig::tls_config to \
315                             activate end-to-end mTLS on this path.",
316                            config.node_name,
317                            config.peer_allowlist.len(),
318                        );
319                    }
320                }
321                _ => {
322                    log::warn!(
323                        "[{}] peer_allowlist is configured ({} entries) but \
324                         transport_kind is not Tls — the allowlist has no \
325                         effect without TLS transport. Set \
326                         RepTransportKind::Tls to activate mTLS enforcement.",
327                        config.node_name,
328                        config.peer_allowlist.len(),
329                    );
330                }
331            }
332        }
333        let node_state = NodeStateMachine::new();
334        let group_service = GroupService::new(config.group_name.clone());
335        let vlsn_index = {
336            // F11: try to load a previously persisted vlsn.idx from
337            // env_home if one exists.  A successfully loaded index lets a
338            // restarted replica resume from where it left off without a
339            // full network restore; a missing or corrupt file falls back
340            // to a fresh in-memory index (caller will need to bootstrap).
341            if let Some(ref home) = config.env_home {
342                match crate::vlsn::persist::load_from_disk(home) {
343                    Ok(Some(idx)) => {
344                        log::info!(
345                            "Node '{}' loaded persisted VLSN index from {} \
346                             ({} entries, latest vlsn={})",
347                            config.node_name,
348                            home.display(),
349                            idx.snapshot_entries().len(),
350                            idx.get_latest_vlsn(),
351                        );
352                        Arc::new(idx)
353                    }
354                    Ok(None) => Arc::new(VlsnIndex::new(10)),
355                    Err(e) => {
356                        log::warn!(
357                            "Node '{}' failed to load persisted VLSN index \
358                             from {}: {} (treating as fresh node — network \
359                             restore required)",
360                            config.node_name,
361                            home.display(),
362                            e
363                        );
364                        // Best-effort: remove the corrupt file so the
365                        // next persist cycle writes a clean one.  A
366                        // missing file is the "fresh node" baseline.
367                        let _ = std::fs::remove_file(
368                            crate::vlsn::persist::index_path(home),
369                        );
370                        Arc::new(VlsnIndex::new(10))
371                    }
372                }
373            } else {
374                Arc::new(VlsnIndex::new(10))
375            }
376        };
377        let ack_tracker = AckTracker::new();
378        let stats = RepStats::new();
379        let feeders = RwLock::new(Vec::new());
380        let replica_stream = ReplicaStream::new();
381        let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
382
383        // Start the service dispatcher.
384        //
385        // Phase 3: when RepConfig::tls_config is set AND transport_kind is Tls,
386        // start a TlsTcpServiceDispatcher (mTLS enforced).  Otherwise fall back
387        // to the plain-TCP TcpServiceDispatcher.
388        let listen_addr_str =
389            format!("{}:{}", config.node_host, config.node_port);
390        let mut restore_registered_init = false;
391
392        // Returns (AnyServiceDispatcher, bound_addr) or (None, None) on error.
393        let (tcp_dispatcher, bound_addr) = match listen_addr_str
394            .parse::<SocketAddr>()
395        {
396            Ok(addr) => {
397                let build_result: Result<(AnyServiceDispatcher, SocketAddr)> =
398                    Self::build_dispatcher(&config, addr);
399                match build_result {
400                    Ok((dispatcher, bound)) => {
401                        // Register the network restore handler.
402                        if let Some(ref home) = config.env_home {
403                            let restore_server =
404                                NetworkRestoreServer::new(home.clone());
405                            dispatcher.register(
406                                RESTORE_SERVICE_NAME,
407                                Arc::new(restore_server),
408                            );
409                            log::debug!(
410                                "Node '{}' RESTORE service registered \
411                                     (env_home={})",
412                                config.node_name,
413                                home.display(),
414                            );
415                            restore_registered_init = true;
416                        }
417                        let kind =
418                            if dispatcher.is_tls() { "TLS" } else { "TCP" };
419                        log::info!(
420                            "Node '{}' {} service dispatcher started on {}",
421                            config.node_name,
422                            kind,
423                            bound
424                        );
425                        (Some(dispatcher), Some(bound))
426                    }
427                    Err(e) => {
428                        log::warn!(
429                            "Node '{}' failed to start service dispatcher \
430                             on {}: {}",
431                            config.node_name,
432                            listen_addr_str,
433                            e
434                        );
435                        (None, None)
436                    }
437                }
438            }
439            Err(e) => {
440                log::warn!(
441                    "Node '{}' cannot parse listen address '{}': {}",
442                    config.node_name,
443                    listen_addr_str,
444                    e
445                );
446                (None, None)
447            }
448        };
449
450        // Build the in-memory peer log scanner; register the peer feeder
451        // service on the dispatcher so downstream replicas can connect.
452        let peer_scanner = Arc::new(PeerLogScanner::new());
453        // F5/F31: build the acceptor state with persistence enabled when
454        // env_home is configured.  Crash-durable promises are required
455        // for the Paxos safety invariant after a process restart.
456        let election_state =
457            Arc::new(if let Some(ref home) = config.env_home {
458                ElectionAcceptorState::with_env_home(
459                    config.node_name.clone(),
460                    1,
461                    home,
462                )
463            } else {
464                ElectionAcceptorState::new(config.node_name.clone(), 1)
465            });
466        if let Some(ref dispatcher) = tcp_dispatcher {
467            let service = PeerFeederService::new(Arc::clone(&peer_scanner));
468            dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
469            log::debug!(
470                "Node '{}' PEER_FEEDER service registered",
471                config.node_name,
472            );
473            // F6: register the ELECTION service so peers can run
474            // run_acceptor against this node when proposing.
475            let election_svc =
476                Arc::new(ElectionService::new(Arc::clone(&election_state)));
477            dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
478            log::debug!(
479                "Node '{}' ELECTION service registered",
480                config.node_name,
481            );
482        }
483
484        let env = Self {
485            config,
486            node_state,
487            group_service,
488            vlsn_index,
489            ack_tracker,
490            stats,
491            feeders,
492            replica_stream,
493            master_tracker,
494            listeners: RwLock::new(Vec::new()),
495            shutdown: AtomicBool::new(false),
496            tcp_dispatcher,
497            bound_addr,
498            env_impl: StdMutex::new(None),
499            io_threads: StdMutex::new(Vec::new()),
500            io_shutdown: AtomicBool::new(false),
501            restore_registered: AtomicBool::new(restore_registered_init),
502            peer_scanner,
503            dtvlsn: std::sync::atomic::AtomicU64::new(0),
504            election_state,
505            self_weak: OnceLock::new(),
506            feeder_channels: StdMutex::new(HashMap::new()),
507            feeder_queues: std::sync::RwLock::new(HashMap::new()),
508            active_feeder_runners: StdMutex::new(HashMap::new()),
509            wal_vlsn_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
510            consistency_tracker: StdMutex::new(None),
511        };
512
513        Ok(env)
514    }
515
516    /// Open a replicated environment with the standard production
517    /// lifecycle.
518    ///
519    /// This is the entry point recommended by the mdBook chapters:
520    /// it allocates the `ReplicatedEnvironment`, registers all
521    /// services on the TCP dispatcher, and spawns the **election
522    /// driver** background thread that runs Paxos rounds against
523    /// known peers until the node has resolved into either Master or
524    /// Replica state.
525    ///
526    /// Closes finding F6 of the 2026 review.
527    ///
528    /// Use [`ReplicatedEnvironment::new`] directly only when the
529    /// caller plans to drive state transitions explicitly (test
530    /// harnesses, scripted bootstrap, recovery tooling).
531    pub fn open(config: RepConfig) -> Result<Arc<Self>> {
532        let env = Arc::new(Self::new(config)?);
533        env.init_self_weak();
534        env.start_election_driver();
535        env.start_vlsn_persistence_daemon();
536        env.register_admin_service();
537        Ok(env)
538    }
539
540    /// Build the service dispatcher for this node.
541    ///
542    /// Phase 3 logic: when `config.transport_kind == Tls` AND
543    /// `config.tls_config` is `Some`, start a
544    /// [`crate::net::service_dispatcher::TlsTcpServiceDispatcher`] that
545    /// enforces mTLS with the configured `peer_allowlist`.  Otherwise
546    /// start the plain-TCP [`TcpServiceDispatcher`].
547    ///
548    /// Returns `(dispatcher, bound_addr)` or a `RepError` on bind / TLS
549    /// config failure.
550    fn build_dispatcher(
551        #[cfg_attr(not(feature = "tls-rustls"), allow(unused_variables))]
552        config: &RepConfig,
553        addr: SocketAddr,
554    ) -> Result<(AnyServiceDispatcher, SocketAddr)> {
555        #[cfg(feature = "tls-rustls")]
556        if config.transport_kind == crate::rep_config::RepTransportKind::Tls {
557            use crate::auth::PeerAllowlist;
558            use crate::net::service_dispatcher::TlsTcpServiceDispatcher;
559            let tls = config.tls_config.as_ref().ok_or_else(|| {
560                RepError::ConfigError(format!(
561                    "node '{}': transport_kind=Tls requires a tls_config",
562                    config.node_name,
563                ))
564            })?;
565            let allowlist =
566                PeerAllowlist::new(config.peer_allowlist.iter().cloned());
567            // Fail-closed: an empty allowlist with TLS transport is a
568            // misconfiguration. The same policy is enforced at the TLS
569            // listener and QUIC constructors; downgrading to plain TCP here
570            // would be a silent security regression for a node that asked
571            // for TLS.
572            if allowlist.is_empty() {
573                return Err(RepError::ConfigError(format!(
574                    "node '{}': transport_kind=Tls requires a non-empty \
575                     peer_allowlist (mTLS enforcement is fail-closed)",
576                    config.node_name,
577                )));
578            }
579            let disp = TlsTcpServiceDispatcher::new(addr, tls, allowlist)?;
580            let bound = disp.start()?;
581            return Ok((AnyServiceDispatcher::Tls(disp), bound));
582        }
583        // Plain-TCP dispatcher (default or when TLS config is missing).
584        let disp = TcpServiceDispatcher::new(addr).map_err(|e| {
585            RepError::NetworkError(format!("TCP dispatcher init: {e}"))
586        })?;
587        let bound = disp.start()?;
588        Ok((AnyServiceDispatcher::Plain(disp), bound))
589    }
590
591    /// Populate the env's self-referential `Weak` so background
592    /// threads can obtain a back-reference for auto-orchestrated
593    /// follow-up actions (e.g. replica auto-bootstrap on
594    /// `NeedsRestore`).  Idempotent: subsequent calls are silent
595    /// no-ops because the inner [`OnceLock`] only accepts one set.
596    ///
597    /// Callers that wrap the env in `Arc` and want auto-bootstrap
598    /// behaviour should call this immediately after construction.
599    /// `Self::open` already does so.  Test harnesses that drive
600    /// transitions manually (`RepTestBase`) also call this so the
601    /// auto-bootstrap path is exercised in tests.
602    pub fn init_self_weak(self: &Arc<Self>) {
603        let _ = self.self_weak.set(Arc::downgrade(self));
604    }
605
606    /// Register the `ADMIN` service handler on the TCP dispatcher.
607    ///
608    /// Closes findings F7 / F8.  Holds a `Weak<Self>` so the handler
609    /// does not extend the env's lifetime.  Idempotent: re-registering
610    /// is harmless because `TcpServiceDispatcher::register` overwrites
611    /// the existing handler.
612    pub fn register_admin_service(self: &Arc<Self>) {
613        if let Some(ref dispatcher) = self.tcp_dispatcher {
614            crate::group_admin::register_admin_service(
615                dispatcher,
616                Arc::downgrade(self),
617            );
618            log::debug!(
619                "Node '{}' ADMIN service registered",
620                self.config.node_name,
621            );
622        }
623    }
624
625    /// Spawn the VLSN-index persistence daemon (F11).
626    ///
627    /// Periodically (every 2 seconds) snapshots the in-memory
628    /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
629    /// resume from where the replica left off without a full network
630    /// restore.  No-op when `config.env_home` is `None`.
631    ///
632    /// Idempotent: only one daemon is ever spawned per env.
633    pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
634        let Some(home) = self.config.env_home.clone() else {
635            return;
636        };
637        {
638            let threads = self.io_threads.lock().unwrap();
639            if threads.iter().any(|h| {
640                h.thread()
641                    .name()
642                    .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
643            }) {
644                return;
645            }
646        }
647
648        let vlsn_index = Arc::clone(&self.vlsn_index);
649        let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
650        let me = Arc::clone(self);
651        let interval = Duration::from_secs(2);
652
653        let handle = std::thread::Builder::new()
654            .name(name)
655            .spawn(move || {
656                use std::sync::atomic::Ordering;
657                let mut last_persisted_vlsn: u64 = 0;
658                while !me.io_shutdown.load(Ordering::SeqCst)
659                    && !me.is_shutdown()
660                {
661                    std::thread::sleep(interval);
662                    if me.io_shutdown.load(Ordering::SeqCst) {
663                        break;
664                    }
665                    let latest = vlsn_index.get_latest_vlsn();
666                    if latest == last_persisted_vlsn {
667                        // Nothing new to flush.
668                        continue;
669                    }
670                    // X-2: cap the flush at the last durable checkpoint's
671                    // end LSN so the persisted VLSN index never claims
672                    // VLSNs beyond the durable B-tree state.  After a crash
673                    // the recovered tree and the index will be coherent.
674                    let cap_lsn = me
675                        .env_impl
676                        .lock()
677                        .unwrap()
678                        .as_ref()
679                        .and_then(|e| e.get_checkpointer())
680                        .map(|c| c.get_last_checkpoint_end())
681                        .unwrap_or(noxu_util::NULL_LSN);
682                    match crate::vlsn::persist::flush_to_disk_capped(
683                        &vlsn_index,
684                        &home,
685                        cap_lsn,
686                    ) {
687                        Ok(n) => {
688                            log::trace!(
689                                "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
690                                n,
691                                latest,
692                                cap_lsn,
693                            );
694                            last_persisted_vlsn = latest;
695                        }
696                        Err(e) => {
697                            log::warn!(
698                                "vlsn-flush: failed to persist VLSN index to {}: {}",
699                                home.display(),
700                                e
701                            );
702                        }
703                    }
704                }
705                // Final flush on shutdown so a clean close is recoverable.
706                // Cap at the last checkpoint even for the shutdown flush.
707                let cap_lsn = me
708                    .env_impl
709                    .lock()
710                    .unwrap()
711                    .as_ref()
712                    .and_then(|e| e.get_checkpointer())
713                    .map(|c| c.get_last_checkpoint_end())
714                    .unwrap_or(noxu_util::NULL_LSN);
715                if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
716                    &vlsn_index,
717                    &home,
718                    cap_lsn,
719                ) {
720                    log::warn!(
721                        "vlsn-flush (final): failed to persist VLSN index: {}",
722                        e
723                    );
724                }
725            })
726            .expect("failed to spawn noxu-vlsn-flush thread");
727
728        self.io_threads.lock().unwrap().push(handle);
729        log::debug!(
730            "Node '{}' VLSN persistence daemon started",
731            self.config.node_name,
732        );
733    }
734
735    /// Spawn the election driver background thread.
736    ///
737    /// While the env is in `Detached` or `Unknown` state and no master
738    /// is known, the driver periodically attempts a Paxos election
739    /// against peers in `GroupService` (whose ELECTION services were
740    /// registered at `open()` time).  On success the driver calls
741    /// `become_master` (if this node is the winner) or `become_replica`
742    /// (otherwise).  On failure (no quorum), the driver waits
743    /// `config.election_timeout` and tries again.
744    ///
745    /// The driver respects `io_shutdown`; on env close the loop exits
746    /// promptly.
747    ///
748    /// Idempotent: a second call is a no-op (only one driver thread is
749    /// ever spawned per env).
750    pub fn start_election_driver(self: &Arc<Self>) {
751        use std::sync::atomic::Ordering;
752        // Reuse io_shutdown for cancellation; a successful spawn is
753        // recorded by appending to io_threads, so a duplicate call
754        // would re-add a thread — we use a one-shot `AtomicBool`
755        // sentinel placed in the io_shutdown's slot via a new field.
756        // Cheaper: a static name check on io_threads is impossible;
757        // instead, gate spawning on whether any io_thread already
758        // carries the driver name.
759        {
760            let threads = self.io_threads.lock().unwrap();
761            if threads.iter().any(|h| {
762                h.thread()
763                    .name()
764                    .is_some_and(|n| n.starts_with("noxu-election-"))
765            }) {
766                return;
767            }
768        }
769
770        let me = Arc::clone(self);
771        let name = format!("noxu-election-{}", self.config.node_name);
772        let handle = std::thread::Builder::new()
773            .name(name)
774            .spawn(move || {
775                me.run_election_loop();
776            })
777            .expect("failed to spawn election driver thread");
778        self.io_threads.lock().unwrap().push(handle);
779        log::debug!("Node '{}' election driver started", self.config.node_name,);
780        // Keep ordering sane on the io_shutdown flag.
781        let _ = self.io_shutdown.load(Ordering::SeqCst);
782    }
783
784    /// Body of the election driver loop.  Public only for tests; called
785    /// by [`Self::start_election_driver`].
786    fn run_election_loop(self: Arc<Self>) {
787        use std::sync::atomic::Ordering;
788        // Maintain an internal monotonically increasing election term.
789        // Each successful or failed round bumps the term so retries do
790        // not collide with stale acceptor promises.
791        let mut term: u64 = 1;
792
793        loop {
794            if self.io_shutdown.load(Ordering::SeqCst) {
795                return;
796            }
797            if self.is_shutdown() {
798                return;
799            }
800
801            let state = self.node_state.get_state();
802            // Stop driving once we've resolved into Master/Replica;
803            // re-arm only if the node returns to Unknown.
804            if matches!(state, NodeState::Master | NodeState::Replica) {
805                std::thread::sleep(Duration::from_millis(200));
806                continue;
807            }
808            if matches!(state, NodeState::Shutdown) {
809                return;
810            }
811
812            // Probe peers for an active master via the existing
813            // GroupService cache.  In the absence of a heartbeat path
814            // we rely on master_tracker (set by become_replica from
815            // the receive loop).
816            if let Some(master_name) = self.master_tracker.get_master()
817                && master_name != self.config.node_name
818            {
819                let _ = self.become_replica(&master_name);
820                continue;
821            }
822
823            // Snapshot peers to dial for ELECTION.
824            let peers: Vec<(String, SocketAddr)> = self
825                .group_service
826                .get_all_nodes()
827                .into_iter()
828                .filter(|n| n.name != self.config.node_name)
829                .filter_map(|n| {
830                    format!("{}:{}", n.host, n.port)
831                        .parse::<SocketAddr>()
832                        .ok()
833                        .map(|a| (n.name, a))
834                })
835                .collect();
836
837            // Build the local rep group view used by run_election to
838            // compute quorum and resolve the winner name.  Include
839            // self.
840            let group = self.local_rep_group_with_self();
841
842            // Update election state for any concurrent acceptor calls.
843            let our_vlsn = self.vlsn_index.get_latest_vlsn();
844            self.election_state.set_vlsn(our_vlsn);
845            self.election_state.set_term(term);
846            // D2: advertise our DTVLSN as the major election-ranking key.
847            self.election_state.set_dtvlsn(self.get_dtvlsn());
848
849            // Connect to each peer's ELECTION service.  Failures are
850            // tolerated: a peer that doesn't answer simply contributes
851            // no vote.  The election may still reach quorum in the
852            // remaining peers.
853            let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
854                Vec::new();
855            for (peer_name, addr) in &peers {
856                match crate::net::service_dispatcher::connect_to_service(
857                    *addr,
858                    ELECTION_SERVICE_NAME,
859                ) {
860                    Ok(ch) => {
861                        let arc: Arc<dyn crate::net::channel::Channel> =
862                            Arc::new(ch);
863                        channels.push(arc);
864                    }
865                    Err(e) => {
866                        log::trace!(
867                            "election driver: peer {} ({}) unreachable: {}",
868                            peer_name,
869                            addr,
870                            e
871                        );
872                    }
873                }
874            }
875
876            // Resolve our own node_id from the group; if not present
877            // we cannot run an election (closed-world guard — see F22).
878            let self_node_id =
879                group.get_node(&self.config.node_name).map(|n| n.node_id());
880            let self_node_id = match self_node_id {
881                Some(id) => id,
882                None => {
883                    log::warn!(
884                        "election driver: node '{}' not registered in \
885                         own group view; sleeping",
886                        self.config.node_name
887                    );
888                    std::thread::sleep(Duration::from_millis(200));
889                    continue;
890                }
891            };
892
893            log::debug!(
894                "election driver on '{}': starting term={} with {} peers",
895                self.config.node_name,
896                term,
897                channels.len(),
898            );
899            let outcome = crate::elections::paxos::run_election_with_phi_dtvlsn(
900                self_node_id,
901                &self.config.node_name,
902                &group,
903                &channels,
904                our_vlsn,
905                /* priority */ 1,
906                term,
907                /* own_dtvlsn (D2 major ranking key) */
908                self.get_dtvlsn(),
909                None,
910                std::time::Duration::from_millis(500),
911            );
912
913            match outcome {
914                Some(winner_id) if winner_id == self_node_id => {
915                    if let Err(e) = self.become_master(term) {
916                        log::warn!(
917                            "election driver: become_master failed: {}",
918                            e
919                        );
920                    } else {
921                        log::info!(
922                            "election driver: '{}' became master at term {}",
923                            self.config.node_name,
924                            term,
925                        );
926                    }
927                }
928                Some(winner_id) => {
929                    if let Some(winner_node) = group
930                        .get_nodes()
931                        .into_iter()
932                        .find(|n| n.node_id() == winner_id)
933                    {
934                        if let Err(e) = self.become_replica(&winner_node.name) {
935                            log::warn!(
936                                "election driver: become_replica failed: {}",
937                                e
938                            );
939                        } else {
940                            log::info!(
941                                "election driver: '{}' became replica of '{}' at term {}",
942                                self.config.node_name,
943                                winner_node.name,
944                                term,
945                            );
946                        }
947                    }
948                }
949                None => {
950                    log::debug!(
951                        "election driver on '{}' term={}: no quorum",
952                        self.config.node_name,
953                        term,
954                    );
955                }
956            }
957
958            term = term.saturating_add(1);
959            // Back off so we don't pin the loop on transient failures.
960            std::thread::sleep(
961                self.config.election_timeout.min(Duration::from_millis(500)),
962            );
963        }
964    }
965
966    /// Internal: a `RepGroup` snapshot that includes self.
967    fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
968        let mut group = self.get_rep_group();
969        // Ensure self is present in the group view; the
970        // group_service does not auto-register the local node.
971        if group.get_node(&self.config.node_name).is_none() {
972            let mut self_node = crate::rep_node::RepNode::new(
973                self.config.node_name.clone(),
974                self.config.node_type,
975                self.config.node_host.clone(),
976                self.config.node_port,
977                /* node_id */ 0,
978            );
979            // Stable self node_id derived from the name hash so
980            // re-creations in the same process don't collide.
981            use std::hash::{Hash, Hasher};
982            let mut hasher = std::collections::hash_map::DefaultHasher::new();
983            self.config.node_name.hash(&mut hasher);
984            // Restrict to a u32 range and avoid 0 (reserved for
985            // "unknown").
986            let id = ((hasher.finish() as u32) | 1).max(1);
987            self_node.node_id = id;
988            group.add_node(self_node);
989        }
990        group
991    }
992
993    /// Return the socket address the TCP service dispatcher is bound to.
994    ///
995    /// This may differ from the configured `node_port` when port 0 is used
996    /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
997    /// could not be started (e.g. the address is not resolvable).
998    pub fn bound_addr(&self) -> Option<SocketAddr> {
999        self.bound_addr
1000    }
1001
1002    /// Wire a live `EnvironmentImpl` into this replicated environment.
1003    ///
1004    /// After this call, state transitions (`become_master`, `become_replica`)
1005    /// will spawn real feeder/receiver I/O threads backed by the live log.
1006    ///
1007    /// If the RESTORE service was not registered at construction time (because
1008    /// `config.env_home` was `None`), it is registered here using the
1009    /// environment's actual home path.  This mirrors`RepNode.envSetup()`
1010    /// which registers the restore handler during environment wiring.
1011    ///
1012    /// Environment reference wiring.
1013    /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
1014    pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
1015        // Register RESTORE service lazily if not already done.
1016        if !self.restore_registered.load(Ordering::SeqCst)
1017            && let Some(ref dispatcher) = self.tcp_dispatcher
1018        {
1019            let env_home = env.get_env_home().to_path_buf();
1020            let restore_server = NetworkRestoreServer::new(env_home.clone());
1021            dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
1022            self.restore_registered.store(true, Ordering::SeqCst);
1023            log::debug!(
1024                "Node '{}' RESTORE service registered via with_environment \
1025                 (env_home={})",
1026                self.config.node_name,
1027                env_home.display(),
1028            );
1029        }
1030
1031        // X-14: rebuild the VLSN index from recovery-replayed LN entries.
1032        // After a crash the on-disk vlsn.idx may be stale (either ahead of
1033        // the recovered B-tree, or behind if vlsn.idx was not flushed
1034        // after the last checkpoint).  Re-registering all (vlsn, lsn) pairs
1035        // from the redo pass gives a consistent in-memory index.
1036        if !env.recovery_vlsns.is_empty() {
1037            log::info!(
1038                "Node '{}': rebuilding VLSN index from {} recovered entries",
1039                self.config.node_name,
1040                env.recovery_vlsns.len(),
1041            );
1042            for &(vlsn, lsn_u64) in &env.recovery_vlsns {
1043                let lsn = noxu_util::Lsn::from_u64(lsn_u64);
1044                self.vlsn_index.register(
1045                    vlsn,
1046                    lsn.file_number(),
1047                    lsn.file_offset(),
1048                );
1049            }
1050        }
1051
1052        // X-1: truncate the VLSN index to the rollback matchpoint if recovery
1053        // detected a completed rollback period.  The matchpoint is the highest
1054        // LSN that is still valid after the rollback; entries with higher VLSNs
1055        // correspond to data that was rolled back and must not appear in the
1056        // index.
1057        if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
1058            // Find the latest VLSN whose LSN is at or before the matchpoint.
1059            // Scan the recovered VLSN pairs (sorted ascending) to find the
1060            // boundary.
1061            let safe_vlsn = env
1062                .recovery_vlsns
1063                .iter()
1064                .rev()
1065                .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
1066                .map(|&(vlsn, _)| vlsn)
1067                .unwrap_or(0);
1068            log::info!(
1069                "Node '{}': truncating VLSN index after vlsn={} \
1070                 (rollback matchpoint lsn={:#x})",
1071                self.config.node_name,
1072                safe_vlsn,
1073                matchpoint_lsn_u64,
1074            );
1075            self.vlsn_index.truncate_after(safe_vlsn);
1076        }
1077
1078        *self.env_impl.lock().unwrap() = Some(Arc::clone(&env));
1079
1080        // C-C2b: install the VLSN counter so log_txn_commit writes
1081        // VLSN-tagged headers.  When become_master then spawns an
1082        // EnvironmentLogScanner-backed FeederRunner, it will find these
1083        // entries and auto-feed them to replicas without any
1084        // replicate_entry call from the application.
1085        env.set_replication_vlsn_counter(Arc::clone(&self.wal_vlsn_counter));
1086    }
1087
1088    /// Get the current node state.
1089    ///
1090    ///
1091    ///
1092    /// Returns the current state of the node associated with this replication
1093    /// environment. If the caller's intent is to track the state of the node,
1094    /// `StateChangeListener` may be a more convenient and efficient approach.
1095    pub fn get_state(&self) -> NodeState {
1096        self.node_state.get_state()
1097    }
1098
1099    /// Check if this node is the master.
1100    ///
1101    /// Returns true if the node's current state is Master.
1102    pub fn is_master(&self) -> bool {
1103        self.node_state.get_state() == NodeState::Master
1104    }
1105
1106    /// Returns true if this node is an *authoritative* master (D4, JE
1107    /// `ElectionQuorum.isAuthoritativeMaster`): it is the group master AND it
1108    /// is still connected to enough replicas that, including itself, a
1109    /// SIMPLE_MAJORITY quorum is present.
1110    ///
1111    /// A master on the minority side of a network partition is NOT
1112    /// authoritative — it must not claim the special election ranking
1113    /// (`MASTER_RANKING`) nor (eventually) continue accepting writes, so the
1114    /// majority side can elect a fresh master without it competing
1115    /// (split-brain prevention).
1116    ///
1117    /// "Active replica count" = the number of currently-connected push-feeder
1118    /// runners serving *electable* peers (Monitors/Secondaries do not count
1119    /// toward the election quorum). `+ 1` for this master itself.
1120    pub fn is_authoritative_master(&self) -> bool {
1121        if !self.is_master() {
1122            return false;
1123        }
1124        let group = self.get_rep_group();
1125        // Total electable nodes (incl. self) — peers + this master.
1126        let electable_total: usize = group
1127            .get_nodes()
1128            .iter()
1129            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
1130            .count()
1131            + 1; // +1 for self/master (not registered as a peer)
1132
1133        // Active replicas = connected feeder runners whose peer is electable.
1134        let active_electable_replicas: usize = {
1135            let runners = self.active_feeder_runners.lock().unwrap();
1136            runners
1137                .keys()
1138                .filter(|name| {
1139                    group
1140                        .get_node(name)
1141                        .map(|n| {
1142                            n.node_type == crate::node_type::NodeType::Electable
1143                        })
1144                        .unwrap_or(false)
1145                })
1146                .count()
1147        };
1148        Self::authoritative_quorum_met(
1149            active_electable_replicas,
1150            electable_total,
1151        )
1152    }
1153
1154    /// Pure SIMPLE_MAJORITY quorum check for `is_authoritative_master` (JE
1155    /// `ElectionQuorum.isAuthoritativeMaster`): `(activeReplicas + 1) >=
1156    /// quorumSize` where `quorumSize = electableTotal / 2 + 1`.
1157    fn authoritative_quorum_met(
1158        active_electable_replicas: usize,
1159        electable_total: usize,
1160    ) -> bool {
1161        let quorum_size = electable_total / 2 + 1;
1162        (active_electable_replicas + 1) >= quorum_size
1163    }
1164
1165    /// Check if this node is a replica.
1166    ///
1167    /// Returns true if the node's current state is Replica.
1168    pub fn is_replica(&self) -> bool {
1169        self.node_state.get_state() == NodeState::Replica
1170    }
1171
1172    /// Returns true if the node is currently participating in the group
1173    /// as a Replica or a Master.
1174    pub fn is_active(&self) -> bool {
1175        self.node_state.get_state().is_active()
1176    }
1177
1178    /// Get the node name.
1179    ///
1180    ///
1181    ///
1182    /// Returns the unique name used to identify this replicated environment.
1183    pub fn get_node_name(&self) -> &str {
1184        self.config.node_name.as_str()
1185    }
1186
1187    /// Get the group name.
1188    ///
1189    /// Returns the name of the replication group this node belongs to.
1190    pub fn get_group_name(&self) -> &str {
1191        self.config.group_name.as_str()
1192    }
1193
1194    /// Get the current master (if known).
1195    ///
1196    /// Returns the name of the node that is currently the master, or None
1197    /// if the master is not known (e.g. the node is in the Unknown or
1198    /// Detached state).
1199    pub fn get_master_name(&self) -> Option<String> {
1200        self.master_tracker.get_master()
1201    }
1202
1203    /// Get the replication group info.
1204    ///
1205    ///
1206    ///
1207    /// Returns a description of the replication group as known by this node.
1208    /// The replicated group metadata is stored in a replicated database and
1209    /// updates are propagated by the current master node to all replicas. If
1210    /// this node is not the master, it is possible for its description of the
1211    /// group to be out of date.
1212    pub fn get_group(&self) -> &GroupService {
1213        &self.group_service
1214    }
1215
1216    /// Add a peer node to the replication group at runtime.
1217    ///
1218    /// The node is registered in the `GroupService` so elections and quorum
1219    /// calculations immediately reflect the new membership.
1220    pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
1221        use crate::group_service::NodeInfo;
1222        use std::time::Instant;
1223
1224        let info = NodeInfo {
1225            name: node.name.clone(),
1226            node_type: node.node_type,
1227            host: node.host.clone(),
1228            port: node.port,
1229            node_id: node.node_id,
1230            joined_at: Instant::now(),
1231            last_seen: Instant::now(),
1232            is_active: true,
1233            known_vlsn: 0,
1234            log_range: None,
1235            read_capacity_pct: node.read_capacity_pct,
1236            write_capacity_pct: node.write_capacity_pct,
1237            latency_hint_ms: node.latency_hint_ms,
1238        };
1239        self.group_service.add_node(info)?;
1240        log::info!(
1241            "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1242            self.config.node_name,
1243            node.name,
1244            node.host,
1245            node.port,
1246            self.config.group_name,
1247        );
1248
1249        // F9: if we are the current master, immediately register a
1250        // `Feeder` tracker for the new peer so AckTracker bookkeeping
1251        // and downstream pull-based streaming work without a forced
1252        // re-election.
1253        if self.is_master()
1254            && (node.node_type == crate::node_type::NodeType::Electable
1255                || node.node_type == crate::node_type::NodeType::Secondary)
1256        {
1257            let mut feeders = self.feeders.write();
1258            if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1259                feeders.push(Feeder::new(node.name.clone()));
1260                log::debug!(
1261                    "Node '{}' (master): dispatched Feeder for new peer '{}'",
1262                    self.config.node_name,
1263                    node.name,
1264                );
1265            }
1266        }
1267        Ok(())
1268    }
1269
1270    /// Remove a peer node from the replication group by name.
1271    ///
1272    /// The node is deregistered from the `GroupService`.  Elections initiated
1273    /// after this call will not include the removed node in quorum calculations.
1274    pub fn remove_peer(&self, name: &str) -> Result<()> {
1275        self.group_service.remove_node(name)?;
1276        log::info!(
1277            "Node '{}': removed peer '{}' from group '{}'",
1278            self.config.node_name,
1279            name,
1280            self.config.group_name,
1281        );
1282        Ok(())
1283    }
1284
1285    /// Update the capacity and latency metadata of an existing peer.
1286    ///
1287    /// Only the following fields are updated from `node`:
1288    ///   - `read_capacity_pct`
1289    ///   - `write_capacity_pct`
1290    ///   - `latency_hint_ms`
1291    ///
1292    /// The node's identity (name, address, port, node_type) is preserved.
1293    /// Safe to call while replication is active.
1294    ///
1295    /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1296    /// is rebuilt to reflect the new capacity/latency weights.
1297    ///
1298    /// # Note
1299    ///
1300    /// `update_peer_metadata` does not currently re-run
1301    /// `QuorumPolicy::validate(electable_count)` after the metadata
1302    /// change.  An LP-optimal `Expression` quorum that was safe before
1303    /// the update may no longer satisfy the intersection property
1304    /// afterwards.  Until automatic revalidation lands, deployments
1305    /// using `QuorumPolicy::Expression` should call
1306    /// `quorum_policy().validate(get_rep_group().electable_count())`
1307    /// on the returned `RepGroup` after every metadata change and
1308    /// fail the operator-facing operation if validation reports
1309    /// unsafety.
1310    pub fn update_peer_metadata(
1311        &self,
1312        name: &str,
1313        node: crate::rep_node::RepNode,
1314    ) -> Result<()> {
1315        self.group_service.update_node_metadata(
1316            name,
1317            node.read_capacity_pct,
1318            node.write_capacity_pct,
1319            node.latency_hint_ms,
1320        )?;
1321        log::info!(
1322            "Node '{}': updated metadata for peer '{}' \
1323             (read_cap={}, write_cap={}, latency={}ms)",
1324            self.config.node_name,
1325            name,
1326            node.read_capacity_pct,
1327            node.write_capacity_pct,
1328            node.latency_hint_ms,
1329        );
1330        Ok(())
1331    }
1332
1333    /// Returns a snapshot of the current replication group as a `RepGroup`.
1334    ///
1335    /// The snapshot reflects the state at the time of the call; subsequent
1336    /// `add_peer` / `remove_peer` calls are not reflected in it.
1337    pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1338        use crate::rep_group::RepGroup;
1339
1340        let mut group = RepGroup::new(
1341            self.config.group_name.clone(),
1342            self.group_service.get_group_id(),
1343        );
1344        for info in self.group_service.get_all_nodes() {
1345            let mut node = crate::rep_node::RepNode::new(
1346                info.name.clone(),
1347                info.node_type,
1348                info.host.clone(),
1349                info.port,
1350                info.node_id,
1351            );
1352            node.read_capacity_pct = info.read_capacity_pct;
1353            node.write_capacity_pct = info.write_capacity_pct;
1354            node.latency_hint_ms = info.latency_hint_ms;
1355            group.add_node(node);
1356        }
1357        group
1358    }
1359
1360    /// Get the replication configuration.
1361    ///
1362    ///
1363    ///
1364    /// Returns the replication configuration that has been used to create this
1365    /// environment.
1366    pub fn get_config(&self) -> &RepConfig {
1367        &self.config
1368    }
1369
1370    /// Get the current VLSN range on this node.
1371    ///
1372    /// Returns the range of VLSNs currently available on this node.
1373    pub fn get_vlsn_range(&self) -> VlsnRange {
1374        self.vlsn_index.get_range()
1375    }
1376
1377    /// Get the latest VLSN.
1378    ///
1379    /// Returns the most recent VLSN registered on this node.
1380    pub fn get_current_vlsn(&self) -> u64 {
1381        self.vlsn_index.get_latest_vlsn()
1382    }
1383
1384    /// The replica-side replication stream state (master high-water, applied
1385    /// VLSN, lag).  Used by the consistency read-gate to learn the master's
1386    /// latest known commit VLSN (JE `ConsistencyTracker.masterTxnEndVLSN`,
1387    /// updated by heartbeats).
1388    pub fn replica_stream(&self) -> &ReplicaStream {
1389        &self.replica_stream
1390    }
1391
1392    /// REP-10 (B): mint a [`CommitToken`] for the most recent commit on this
1393    /// master.
1394    ///
1395    /// Port of `MasterTxn.getCommitToken`: returns
1396    /// `new CommitToken(envUUID, commitVLSN.getSequence())`.  A client that
1397    /// just performed a write on the master calls this to obtain the token it
1398    /// will hand to a subsequent replica read
1399    /// (`Transaction.getCommitToken`).  Returns `None` on a non-master or when
1400    /// no commit VLSN exists yet (JE returns `null` when `commitVLSN.isNull`).
1401    ///
1402    /// The token's VLSN is the master's latest assigned VLSN — the same
1403    /// `wal_vlsn_counter` high-water the ack gate keys on (the commit was
1404    /// logged immediately before this call).
1405    pub fn commit_token(&self) -> Option<crate::CommitToken> {
1406        if !self.is_master() {
1407            return None;
1408        }
1409        let vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
1410        crate::CommitToken::new(self.config.group_name.clone(), vlsn)
1411    }
1412
1413    /// REP-10 (C): the read-gate. Enforce a replica read-consistency policy
1414    /// before a read transaction proceeds.
1415    ///
1416    /// Port of `ReplicaConsistencyPolicy.ensureConsistency` as invoked from a
1417    /// replica `beginTransaction` (`RepImpl.checkConsistency` /
1418    /// `Replica.getConsistencyTracker().awaitVLSN`).  Called by the replica
1419    /// env's transaction-begin / read path.
1420    ///
1421    /// - `policy_override`: a per-transaction policy (JE
1422    ///   `TransactionConfig.setConsistencyPolicy`).  When `None`, the node's
1423    ///   configured default is used (`ReplicationConfig.setConsistencyPolicy`
1424    ///   — [`RepConfig::consistency_policy`]).
1425    ///
1426    /// On a master, or when the effective policy is
1427    /// [`ConsistencyPolicy::NoConsistency`], this returns immediately so
1428    /// existing behaviour is unchanged unless a policy is set.  On a replica
1429    /// with a non-`NoConsistency` policy it BLOCKS until the replica has
1430    /// replayed far enough or the policy timeout expires (a clean
1431    /// [`RepError`], never a hang).
1432    pub fn begin_read_consistency(
1433        &self,
1434        policy_override: Option<&crate::ConsistencyPolicy>,
1435    ) -> Result<()> {
1436        // Resolve the effective policy: per-txn override else node default.
1437        let default_policy = self.config.consistency_policy.clone();
1438        let policy = policy_override.unwrap_or(&default_policy);
1439
1440        // NoConsistency never blocks (the master path also lands here).
1441        if matches!(policy, crate::ConsistencyPolicy::NoConsistency) {
1442            return Ok(());
1443        }
1444
1445        // A non-No policy only makes sense on a replica with a live replay
1446        // (its last_applied_vlsn is the wait predicate).  Without a tracker
1447        // there is nothing to wait on — treat as immediately consistent
1448        // rather than block forever (e.g. on the master, which is by
1449        // definition fully current).
1450        let tracker = self.consistency_tracker.lock().unwrap().clone();
1451        let Some(tracker) = tracker else {
1452            return Ok(());
1453        };
1454
1455        // Surface the master's latest known VLSN for the time policy
1456        // (heartbeat / feeder high-water).  JE ConsistencyTracker tracks this
1457        // via trackHeartbeat; here we read the replica_stream high-water.
1458        let master_vlsn = self.replica_stream.get_master_vlsn();
1459        if master_vlsn > 0 {
1460            tracker.set_master_vlsn(master_vlsn);
1461        }
1462
1463        tracker.await_consistency(policy)
1464    }
1465
1466    /// REP-10 (C) test seam: install a [`ConsistencyTracker`] over an existing
1467    /// `last_applied_vlsn` handle, exactly as `become_replica` does when it
1468    /// starts the live replay thread.
1469    ///
1470    /// Lets a test drive a real [`noxu_dbi::ReplicaReplay`] and exercise
1471    /// [`Self::begin_read_consistency`] end-to-end without standing up TCP
1472    /// feeder/receiver threads.  Not part of the production API.
1473    #[cfg(any(test, feature = "test-harness"))]
1474    pub fn install_consistency_tracker_for_test(
1475        &self,
1476        last_applied_vlsn: std::sync::Arc<std::sync::atomic::AtomicU64>,
1477    ) -> crate::ConsistencyTracker {
1478        let tracker = crate::ConsistencyTracker::new(last_applied_vlsn);
1479        *self.consistency_tracker.lock().unwrap() = Some(tracker.clone());
1480        tracker
1481    }
1482
1483    /// REP-1 STEP 5 (D): run a live syncup against `feeder` and, if this
1484    /// replica's tail diverged, ROLL IT BACK to the common matchpoint instead
1485    /// of falling back to a network restore.
1486    ///
1487    /// Port of the replica's side of JE `ReplicaFeederSyncup.execute`:
1488    /// `findMatchpoint` → `verifyRollback` → `replay.rollback` →
1489    /// `vlsnIndex.truncateFromTail` → resume streaming from `matchpoint + 1`.
1490    ///
1491    /// `feeder` is the master's [`crate::stream::syncup::SyncupView`] (built
1492    /// from its VLSN index, or exchanged over the syncup wire protocol in
1493    /// [`crate::stream::syncup_protocol`]). The decision uses the same pure
1494    /// core the protocol drives: `find_matchpoint` + `verify_rollback`.
1495    ///
1496    /// Returns:
1497    /// - [`SyncupAction::RolledBack`] — the divergent tail was truncated to
1498    ///   the matchpoint; resume streaming from `start_vlsn`. The non-diverged
1499    ///   case (matchpoint == last VLSN) returns `RolledBack` with an empty
1500    ///   tail and is a no-op rollback.
1501    /// - [`SyncupAction::NeedsRestore`] — `verify_rollback` selected
1502    ///   NetworkRestore (no common matchpoint) or HardRecovery (the rollback
1503    ///   would cross a committed/aborted txn); the caller must network-restore
1504    ///   per JE.
1505    ///
1506    /// The non-diverged fast path (the replica's range is a prefix of the
1507    /// feeder's) is still served by the range-check `negotiate_syncup`
1508    /// (`SyncupResult::CanServe`) in the streaming path; this method is the
1509    /// DIVERGED case.
1510    pub fn syncup_with_feeder(
1511        &self,
1512        feeder: &dyn crate::stream::syncup::SyncupView,
1513    ) -> Result<SyncupAction> {
1514        // Build the replica's SyncupView. When a real LogManager is wired,
1515        // re-read the log (SyncupLogView) so the per-VLSN fingerprint is the
1516        // actual record checksum (JE ReplicaSyncupReader). Otherwise (the
1517        // VLSN-index-only harness model) fall back to the index view, whose
1518        // fingerprint is the LSN.
1519        let log_view: Option<crate::stream::syncup_reader::SyncupLogView> =
1520            self.env_impl.lock().unwrap().clone().and_then(|env| {
1521                if let Some(lm) = env.get_log_manager() {
1522                    // Flush so all VLSN-tagged entries are on disk before the
1523                    // backward re-read (JE flushNoSync in initScan).
1524                    let _ = lm.flush_sync();
1525                }
1526                crate::stream::syncup_reader::SyncupLogView::scan(
1527                    env.get_env_home(),
1528                )
1529            });
1530        let index_view = VlsnIndexView::from_index(&self.vlsn_index);
1531        let replica_view: &dyn crate::stream::syncup::SyncupView =
1532            match &log_view {
1533                Some(v) => v,
1534                None => &index_view,
1535            };
1536
1537        let range = self.vlsn_index.get_range();
1538        let last_sync = range.get_last_sync();
1539        let last_txn_end = range.get_last_txn_end();
1540        let to_vlsn = |v: u64| {
1541            if v == 0 {
1542                noxu_util::NULL_VLSN
1543            } else {
1544                noxu_util::Vlsn::new(v as i64)
1545            }
1546        };
1547
1548        // Step 1: find the matchpoint (JE findMatchpoint).
1549        let matchpoint = find_matchpoint(replica_view, feeder);
1550
1551        // numPassedCommits: count of txn ends strictly above the matchpoint.
1552        // When we re-read the log, count them exactly; otherwise rely on the
1553        // numeric `lastTxnEnd <= matchpoint` test in verify_rollback (which
1554        // matches JE when sync points == txn ends).
1555        let num_passed_commits = match (&log_view, &matchpoint) {
1556            (Some(v), Matchpoint::Found { vlsn, .. }) => {
1557                v.num_passed_commits(*vlsn)
1558            }
1559            _ => 0,
1560        };
1561        let decision = verify_rollback(
1562            &matchpoint,
1563            to_vlsn(last_txn_end),
1564            to_vlsn(last_sync),
1565            num_passed_commits,
1566        );
1567
1568        match decision {
1569            RollbackDecision::RollbackToMatchpoint {
1570                matchpoint_vlsn,
1571                start_vlsn,
1572            } => {
1573                let matchpoint_lsn = match &matchpoint {
1574                    Matchpoint::Found { lsn, .. } => *lsn,
1575                    Matchpoint::None => 0,
1576                };
1577                // Collect the rolled-back LSNs (VLSNs strictly above the
1578                // matchpoint). When the real log was re-read, use its EXACT
1579                // per-VLSN LSNs so make-invisible flips the right header bytes
1580                // (the sparse VLSN index only stores boundary/last LSNs).
1581                let mp = matchpoint_vlsn.sequence().max(0) as u64;
1582                let rollback_lsns: Vec<noxu_util::Lsn> = match &log_view {
1583                    Some(v) => v
1584                        .entries()
1585                        .filter(|(vlsn, _)| (vlsn.sequence() as u64) > mp)
1586                        .map(|(_, e)| noxu_util::Lsn::from_u64(e.lsn))
1587                        .collect(),
1588                    None => self
1589                        .vlsn_index
1590                        .snapshot_entries()
1591                        .into_iter()
1592                        .filter(|(vlsn, _, _)| *vlsn > mp)
1593                        .map(|(_, file, offset)| {
1594                            noxu_util::Lsn::new(file, offset)
1595                        })
1596                        .collect(),
1597                };
1598                self.execute_rollback(mp, matchpoint_lsn, &rollback_lsns)?;
1599                Ok(SyncupAction::RolledBack {
1600                    matchpoint_vlsn: mp,
1601                    start_vlsn: start_vlsn.sequence().max(0) as u64,
1602                })
1603            }
1604            RollbackDecision::HardRecovery { .. }
1605            | RollbackDecision::NetworkRestore => {
1606                Ok(SyncupAction::NeedsRestore)
1607            }
1608        }
1609    }
1610
1611    /// Execute the durable + in-memory rollback to `matchpoint_vlsn`
1612    /// (LSN `matchpoint_lsn`). Port of JE `Replay.rollback` +
1613    /// `vlsnIndex.truncateFromTail`.
1614    ///
1615    /// Durable steps (RollbackStart/End + make-invisible + fsync) go through
1616    /// [`noxu_recovery::rollback`] when a `LogManager` is wired; the VLSN index
1617    /// is always truncated to the matchpoint so the reported range matches the
1618    /// rolled-back state and streaming resumes from `matchpoint + 1`.
1619    fn execute_rollback(
1620        &self,
1621        matchpoint_vlsn: u64,
1622        matchpoint_lsn: u64,
1623        rollback_lsns: &[noxu_util::Lsn],
1624    ) -> Result<()> {
1625        // Durable rollback (RollbackStart … make-invisible … RollbackEnd) when
1626        // a live LogManager is available. The harness-level env (VLSN-index
1627        // only, no LogManager) skips the on-disk steps; the index truncation
1628        // below is what makes the replica converge in that model.
1629        if let Some(env) = self.env_impl.lock().unwrap().clone()
1630            && let Some(log_mgr) = env.get_log_manager()
1631            && matchpoint_lsn != 0
1632        {
1633            let mp_lsn = noxu_util::Lsn::from_u64(matchpoint_lsn);
1634            // active_txn_ids: the harness/VLSN-index model has no live txn
1635            // table here; the durable RollbackStart records an empty set, and
1636            // the per-txn gating (REP-1 STEP 2) applies during recovery when
1637            // the analysis pass rebuilds the active set. A future pass can
1638            // thread the live ReplayTxn ids through (JE
1639            // localActiveTxns.keySet()).
1640            noxu_recovery::rollback(
1641                &log_mgr,
1642                noxu_util::Vlsn::new(matchpoint_vlsn as i64),
1643                mp_lsn,
1644                Vec::new(),
1645                rollback_lsns,
1646            )
1647            .map_err(|e| {
1648                RepError::DatabaseError(format!(
1649                    "live rollback to matchpoint failed: {e}"
1650                ))
1651            })?;
1652        }
1653
1654        // JE vlsnIndex.truncateFromTail(startVLSN, matchpointLSN): drop the
1655        // divergent VLSN tail so the reported range matches the recovered
1656        // state and streaming resumes from matchpoint + 1.
1657        self.vlsn_index.truncate_after(matchpoint_vlsn);
1658
1659        log::info!(
1660            "Node '{}': live syncup rolled back to matchpoint vlsn={} \
1661             (lsn={:#x}); {} tail entries truncated",
1662            self.config.node_name,
1663            matchpoint_vlsn,
1664            matchpoint_lsn,
1665            rollback_lsns.len(),
1666        );
1667        Ok(())
1668    }
1669
1670    /// Test-only: clone the env's SHARED VLSN index `Arc`.
1671    ///
1672    /// REP-6: the replica receive loop (`become_replica` ->
1673    /// `EnvironmentLogWriter`) must feed THIS index — the one
1674    /// `get_vlsn_range`, `flush_to_disk`, and election ranking read — not a
1675    /// throwaway. Tests use this to build a writer the same way
1676    /// `become_replica` does and assert the shared index advances.
1677    #[cfg(feature = "test-harness")]
1678    pub fn vlsn_index_arc(&self) -> Arc<crate::vlsn::vlsn_index::VlsnIndex> {
1679        Arc::clone(&self.vlsn_index)
1680    }
1681
1682    /// Return the list of replica names that currently have a `Feeder`
1683    /// tracker on this (master) node.
1684    ///
1685    /// Used by tests and operator tooling.  The returned list reflects
1686    /// the master's view at the time of the call; subsequent
1687    /// `add_peer`/`remove_peer` calls may change it.
1688    pub fn feeder_replica_names(&self) -> Vec<String> {
1689        self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1690    }
1691
1692    // -----------------------------------------------------------------------
1693    // C-C2 — active push feeder API
1694    // -----------------------------------------------------------------------
1695
1696    /// Register a channel for pushing log entries to a specific replica.
1697    ///
1698    /// When [`Self::become_master`] is called — or if the node is **already
1699    /// master** — a [`FeederRunner`] background thread is immediately spawned
1700    /// for this channel.  The thread reads from a dedicated in-memory queue
1701    /// that is fed by [`Self::replicate_entry`] / [`Self::apply_entry`], and
1702    /// sends framed log entries to the replica over `channel`.  Acks sent
1703    /// back by the replica are visible via
1704    /// [`Self::active_feeder_runner_acked_vlsn`].
1705    ///
1706    /// # Production vs. test use
1707    ///
1708    /// *Production*: pass a [`crate::net::TcpChannel`] connected to the
1709    /// replica's inbound feeder service.  
1710    /// *Tests*: pass one half of a [`crate::net::LocalChannelPair`].
1711    ///
1712    /// # Note on push vs. pull
1713    ///
1714    /// Registering a channel activates the **push** path: the master
1715    /// initiates and owns the feeder connection.  The existing **pull** path
1716    /// (`PeerFeederService` / `catch_up_from_peer`) continues to operate in
1717    /// parallel for replicas that connect proactively.  Do not register a
1718    /// channel for a replica that already connects via the pull path, or
1719    /// entries may be delivered twice.
1720    ///
1721    /// If `become_master` was called *before* registering the channel, call
1722    /// this method afterward; it will spawn the FeederRunner immediately.
1723    pub fn register_feeder_channel(
1724        &self,
1725        replica_name: String,
1726        channel: Arc<dyn crate::net::Channel>,
1727    ) {
1728        {
1729            let mut ch = self.feeder_channels.lock().unwrap();
1730            ch.insert(replica_name.clone(), Arc::clone(&channel));
1731        }
1732        if self.is_master() {
1733            self.spawn_feeder_runner(replica_name, channel);
1734        }
1735    }
1736
1737    /// Return the last VLSN acknowledged by the FeederRunner for `replica_name`.
1738    ///
1739    /// Returns `0` if no FeederRunner is currently active for that replica
1740    /// (either `become_master` was not called yet, or no channel was
1741    /// registered).  Use this to poll catch-up progress before shutdown.
1742    pub fn active_feeder_runner_acked_vlsn(&self, replica_name: &str) -> u64 {
1743        self.active_feeder_runners
1744            .lock()
1745            .unwrap()
1746            .get(replica_name)
1747            .map(|r| r.known_replica_vlsn())
1748            .unwrap_or(0)
1749    }
1750
1751    /// Spawn a FeederRunner thread for `replica_name` using `channel`.
1752    ///
1753    /// Creates a dedicated `PeerLogScanner` queue for the replica, registers
1754    /// it in `feeder_queues` so that future `replicate_entry` / `apply_entry`
1755    /// calls fan out into it, spawns the `FeederRunner::run` loop, and
1756    /// records the `Arc<FeederRunner>` in `active_feeder_runners`.
1757    ///
1758    /// Idempotent: if a FeederRunner is already active for `replica_name`
1759    /// (from a prior `become_master` call), it is replaced — the old channel
1760    /// should have been closed already via `close()`.
1761    ///
1762    /// **WAL-scanner auto-feed path (C-C2b)**: when a live `EnvironmentImpl`
1763    /// has been wired via `with_environment`, the FeederRunner thread uses an
1764    /// `EnvironmentLogScanner` as its source.  Every `log_txn_commit` on the
1765    /// master writes a VLSN-tagged WAL entry (22-byte header); the scanner
1766    /// finds these entries and streams them to the replica automatically,
1767    /// without any `replicate_entry` call from the application.
1768    ///
1769    /// **Fallback path**: when no `EnvironmentImpl` is wired the runner reads
1770    /// from the in-memory `PeerLogScanner` queue populated by
1771    /// `replicate_entry` / `apply_entry` — the previous manual behaviour.
1772    fn spawn_feeder_runner(
1773        &self,
1774        replica_name: String,
1775        channel: Arc<dyn crate::net::Channel>,
1776    ) {
1777        // Dedicated entry queue: entries flowing from this master reach the
1778        // FeederRunner without competing with PeerFeederService.
1779        let queue = Arc::new(PeerLogScanner::new());
1780        {
1781            self.feeder_queues
1782                .write()
1783                .unwrap()
1784                .insert(replica_name.clone(), Arc::clone(&queue));
1785        }
1786
1787        // REP-9 Part 1: wire an ack sink so the FeederRunner forwards every
1788        // inbound replica ack to `env.record_ack(vlsn, replica_name)`, which
1789        // reaches BOTH the AckTracker (commit-blocking quorum) and the
1790        // matching `Feeder::acked_vlsn` (DTVLSN ranking).  Without this the
1791        // ack reached only the runner's private `known_replica_vlsn`.  The
1792        // sink holds a `Weak<Self>` so it never extends the env's lifetime;
1793        // if `self_weak` was never initialised we fall back to the plain
1794        // (sink-less) runner — `record_ack` is still reachable from tests.
1795        let runner = match self.self_weak.get().and_then(Weak::upgrade) {
1796            Some(env_arc) => {
1797                let weak = Arc::downgrade(&env_arc);
1798                let sink: crate::stream::feeder::AckSink =
1799                    Arc::new(move |name: &str, vlsn: u64| {
1800                        if let Some(env) = weak.upgrade() {
1801                            env.record_ack(vlsn, name);
1802                        }
1803                    });
1804                Arc::new(FeederRunner::new_with_ack_sink(
1805                    Arc::clone(&channel),
1806                    1,
1807                    replica_name.clone(),
1808                    sink,
1809                ))
1810            }
1811            None => Arc::new(FeederRunner::new(Arc::clone(&channel), 1)),
1812        };
1813        let runner_clone = Arc::clone(&runner);
1814        let replica_clone = replica_name.clone();
1815
1816        // C-C2b: prefer EnvironmentLogScanner (WAL auto-feed) when env is
1817        // wired; fall back to in-memory queue (manual replicate_entry path)
1818        // otherwise.
1819        let env_opt = self.env_impl.lock().unwrap().clone();
1820
1821        let handle = std::thread::Builder::new()
1822            .name(format!("noxu-feeder-{}", replica_name))
1823            .spawn(move || {
1824                if let Some(env) = env_opt {
1825                    if let Some(mut scanner) =
1826                        EnvironmentLogScanner::new(&env, None)
1827                    {
1828                        log::info!(
1829                            "FeederRunner for replica '{}': using \
1830                             EnvironmentLogScanner (WAL auto-feed)",
1831                            replica_clone,
1832                        );
1833                        let _ = runner_clone.run(&mut scanner);
1834                    } else {
1835                        log::warn!(
1836                            "FeederRunner for replica '{}': \
1837                             EnvironmentLogScanner unavailable, \
1838                             falling back to in-memory queue",
1839                            replica_clone,
1840                        );
1841                        let mut source = PeerScannerAdapter::new(queue, 0);
1842                        let _ = runner_clone.run(&mut source);
1843                    }
1844                } else {
1845                    let mut source = PeerScannerAdapter::new(queue, 0);
1846                    let _ = runner_clone.run(&mut source);
1847                }
1848                log::debug!(
1849                    "FeederRunner for replica '{}' exited cleanly",
1850                    replica_clone
1851                );
1852            })
1853            .expect("failed to spawn FeederRunner thread");
1854
1855        {
1856            let mut runners = self.active_feeder_runners.lock().unwrap();
1857            runners.insert(replica_name.clone(), Arc::clone(&runner));
1858        }
1859        self.io_threads.lock().unwrap().push(handle);
1860
1861        log::info!(
1862            "Node '{}' (master): FeederRunner thread spawned for replica '{}'",
1863            self.config.node_name.as_str(),
1864            replica_name,
1865        );
1866    }
1867
1868    // -----------------------------------------------------------------------
1869
1870    /// Bootstrap this node's environment by network-restoring all `.ndb`
1871    /// files from `peer_name` via the dispatcher's RESTORE service.
1872    ///
1873    /// Closes findings F2 / F4 of the 2026 review.
1874    ///
1875    /// The standalone `NetworkRestore::execute()` opens raw TCP and
1876    /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1877    /// Production replicated environments host the RESTORE handler on the
1878    /// dispatcher, so this method routes through `execute_via_dispatcher`.
1879    ///
1880    /// `peer_name` must be a known peer in `GroupService`; on success the
1881    /// peer's `.ndb` files are written into `config.env_home`.  Returns
1882    /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1883    /// fails for any reason.
1884    pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1885        let env_home = self.config.env_home.clone().ok_or_else(|| {
1886            RepError::ConfigError(
1887                "bootstrap_via_dispatcher requires env_home in RepConfig"
1888                    .into(),
1889            )
1890        })?;
1891        let peer_info = self
1892            .group_service
1893            .get_all_nodes()
1894            .into_iter()
1895            .find(|n| n.name == peer_name)
1896            .ok_or_else(|| {
1897                RepError::ConfigError(format!(
1898                    "peer '{}' not registered in group '{}'",
1899                    peer_name, self.config.group_name,
1900                ))
1901            })?;
1902
1903        let cfg = NetworkRestoreConfig {
1904            source_node: peer_info.name.clone(),
1905            source_host: peer_info.host.clone(),
1906            source_port: peer_info.port,
1907            retain_log_files: true,
1908        };
1909        let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1910        restore.execute_via_dispatcher()?;
1911        log::info!(
1912            "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1913            self.config.node_name,
1914            peer_info.name,
1915            peer_info.host,
1916            peer_info.port,
1917        );
1918        Ok(())
1919    }
1920
1921    /// Get replication statistics.
1922    ///
1923    ///
1924    ///
1925    /// Returns statistics associated with this environment.
1926    pub fn get_stats(&self) -> &RepStats {
1927        &self.stats
1928    }
1929
1930    /// Get the ack tracker.
1931    pub fn get_ack_tracker(&self) -> &AckTracker {
1932        &self.ack_tracker
1933    }
1934
1935    /// Ensure the node state machine is in Unknown state, transitioning
1936    /// from Detached if necessary. This is needed because the state machine
1937    /// only allows Detached -> Unknown -> Master/Replica.
1938    pub fn ensure_unknown_state(&self) -> Result<()> {
1939        let current = self.node_state.get_state();
1940        match current {
1941            NodeState::Unknown => Ok(()),
1942            NodeState::Detached => {
1943                self.node_state.transition_to(NodeState::Unknown)?;
1944                Ok(())
1945            }
1946            // Master and Replica must transition through Unknown before
1947            // joining a new group or reconnecting.
1948            NodeState::Master | NodeState::Replica => {
1949                self.node_state.transition_to(NodeState::Unknown)?;
1950                Ok(())
1951            }
1952            NodeState::Shutdown => {
1953                Err(RepError::StateError("Node is shut down".to_string()))
1954            }
1955        }
1956    }
1957
1958    /// Transition to master state.
1959    ///
1960    /// Transitions this node to Master state for the given election term.
1961    /// As master, the node can accept write operations and feed log entries
1962    /// to replicas.
1963    ///
1964    /// **Active push-feeder** (C-C2): if feeder channels have been registered
1965    /// via [`Self::register_feeder_channel`] before this call, a
1966    /// [`FeederRunner`] background thread is spawned per channel.
1967    ///
1968    /// **WAL-scanner auto-feed path (C-C2b, v3.3.0)**: when
1969    /// [`Self::with_environment`] has been called before `become_master`,
1970    /// each `FeederRunner` thread uses an [`EnvironmentLogScanner`] as its
1971    /// source.  Every `log_txn_commit` on the master writes a VLSN-tagged
1972    /// 22-byte WAL entry (via `LogManager::log_with_vlsn`); the scanner
1973    /// discovers these entries and streams them to replicas automatically,
1974    /// without any [`Self::replicate_entry`] call from the application.
1975    ///
1976    /// **Fallback path**: when no `EnvironmentImpl` is wired, the runner
1977    /// reads from the in-memory queue populated by [`Self::replicate_entry`] /
1978    /// [`Self::apply_entry`].
1979    ///
1980    /// If no feeder channels are registered, this call registers per-replica
1981    /// `Feeder` tracker structs for `AckTracker` bookkeeping only.  In that
1982    /// case replicas must connect proactively to the `PEER_FEEDER` pull
1983    /// service to receive entries.
1984    pub fn become_master(&self, term: u64) -> Result<()> {
1985        if self.is_shutdown() {
1986            return Err(RepError::StateError(
1987                "Cannot become master: environment is closed".to_string(),
1988            ));
1989        }
1990
1991        // JE invariant: only `Electable` nodes can become master.  `Secondary`,
1992        // `Monitor`, and `Arbiter` are not electable and must be rejected at
1993        // the API layer (mirrors JE `ExceptionTest`).  See
1994        // `NodeType::can_be_master`.
1995        if !self.config.node_type.can_be_master() {
1996            return Err(RepError::InvalidStateTransition(format!(
1997                "node '{}' has type {} which is not electable as master",
1998                self.config.node_name.as_str(),
1999                self.config.node_type,
2000            )));
2001        }
2002
2003        // Ensure we can reach Master state (may need Detached -> Unknown first)
2004        self.ensure_unknown_state()?;
2005
2006        let old_state = self.node_state.get_state();
2007        self.node_state.transition_to(NodeState::Master)?;
2008        self.master_tracker.set_master(self.config.node_name.as_str(), term);
2009
2010        // --- F9: spawn Feeder trackers for each known replica -------------
2011        //
2012        // Closes finding F9 of the 2026 review.
2013        // The architecture is pull-based: replicas pull from the master's
2014        // `PEER_FEEDER` service via `catch_up_from_peer`.  However, the
2015        // master must:
2016        //   1. Track each replica via a `Feeder` so AckTracker bookkeeping
2017        //      can attribute replica acks to the right node.
2018        //   2. Push its own writes into `peer_scanner` so replicas pulling
2019        //      from `PEER_FEEDER` actually receive entries (`replicate_entry`).
2020        //
2021        // Here we ensure step 1: every known electable peer in the group
2022        // gets a `Feeder` entry.
2023        {
2024            let mut feeders = self.feeders.write();
2025            // Drop any stale feeders left over from a prior role.  A
2026            // `Feeder` is just an in-memory tracker; recreating it is
2027            // cheap and avoids state inversion bugs across role changes.
2028            feeders.clear();
2029            for peer in self.group_service.get_all_nodes() {
2030                if peer.name == self.config.node_name {
2031                    continue;
2032                }
2033                if peer.node_type != crate::node_type::NodeType::Electable
2034                    && peer.node_type != crate::node_type::NodeType::Secondary
2035                {
2036                    // Arbiters do not receive log entries.
2037                    continue;
2038                }
2039                feeders.push(Feeder::new(peer.name.clone()));
2040                log::debug!(
2041                    "Node '{}' (master, term={}): registered Feeder for \
2042                     replica '{}'",
2043                    self.config.node_name.as_str(),
2044                    term,
2045                    peer.name,
2046                );
2047            }
2048        }
2049
2050        // For observability, log the count.
2051        log::info!(
2052            "Node '{}' became master for term {} \
2053             (feeder trackers: {} known replicas)",
2054            self.config.node_name.as_str(),
2055            term,
2056            self.feeders.read().len(),
2057        );
2058
2059        // C-C2: spawn FeederRunner threads for pre-registered channels.
2060        //
2061        // When `register_feeder_channel` was called before `become_master`,
2062        // the channels are already in `feeder_channels`. Drain them and
2063        // spawn a FeederRunner per replica.  The FeederRunner reads from a
2064        // dedicated `PeerLogScanner` queue (populated by `replicate_entry`
2065        // fan-out) and pushes framed log entries to the replica over the
2066        // registered channel.  Acks from the replica are tracked in the
2067        // FeederRunner and visible via `active_feeder_runner_acked_vlsn`.
2068        {
2069            let channels: Vec<(String, Arc<dyn crate::net::Channel>)> = self
2070                .feeder_channels
2071                .lock()
2072                .unwrap()
2073                .iter()
2074                .map(|(k, v)| (k.clone(), Arc::clone(v)))
2075                .collect();
2076            for (replica_name, channel) in channels {
2077                self.spawn_feeder_runner(replica_name, channel);
2078            }
2079        }
2080
2081        // -------------------------------------------------------------------
2082
2083        // Notify listeners
2084        self.notify_listeners(old_state, NodeState::Master);
2085
2086        Ok(())
2087    }
2088
2089    /// Transition to replica state with the given master.
2090    ///
2091    /// Transitions this node to Replica state. The node will receive log
2092    /// entries from the specified master.
2093    ///
2094    /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
2095    /// the method prepares an `EnvironmentLogWriter` so that replicated
2096    /// entries can be written to the local log.  The actual network connection
2097    /// is established by the `TcpServiceDispatcher`; this method logs intent.
2098    ///
2099    /// In HA.
2100    pub fn become_replica(&self, master_name: &str) -> Result<()> {
2101        if self.is_shutdown() {
2102            return Err(RepError::StateError(
2103                "Cannot become replica: environment is closed".to_string(),
2104            ));
2105        }
2106
2107        // Ensure we can reach Replica state (may need Detached -> Unknown first)
2108        self.ensure_unknown_state()?;
2109
2110        let old_state = self.node_state.get_state();
2111        self.node_state.transition_to(NodeState::Replica)?;
2112        self.master_tracker.set_master(master_name, 0);
2113        self.replica_stream.set_master(master_name);
2114        self.replica_stream.set_state(
2115            crate::stream::replica_stream::ReplicaStreamState::Connecting,
2116        );
2117
2118        // --- G19: start replica receive loop --------------------------------
2119        //
2120        // Connects to the master's PEER_FEEDER service and runs a
2121        // ReplicaReceiver loop in a background thread.  The receiver writes
2122        // replicated entries via EnvironmentLogWriter.
2123        if let Some(env) = self.env_impl.lock().unwrap().clone() {
2124            if let Some(log_mgr) = env.get_log_manager() {
2125                // REP-6: feed the env's SHARED, persisted VLSN index (the one
2126                // flush_to_disk persists and get_vlsn_range / election ranking
2127                // read) into the replica receive loop — NOT a throwaway. Using
2128                // a fresh index would leave the persisted vlsn.idx, the
2129                // reported VLSN range, and the DTVLSN-ranking own_vlsn lagging
2130                // the actually-received stream, widening catch-up (or forcing
2131                // an unnecessary network restore) after a clean restart.
2132                // JE: the replica's VLSNIndex IS the environment's persisted
2133                // index (see VLSNIndex).
2134                let vlsn_index = Arc::clone(&self.vlsn_index);
2135
2136                // Resolve the master's socket address from the GroupService.
2137                let master_addr_opt: Option<SocketAddr> = self
2138                    .group_service
2139                    .get_all_nodes()
2140                    .iter()
2141                    .find(|n| n.name == master_name)
2142                    .and_then(|info| {
2143                        format!("{}:{}", info.host, info.port)
2144                            .parse::<SocketAddr>()
2145                            .ok()
2146                    });
2147
2148                let node_name = self.config.node_name.clone();
2149                let master = master_name.to_string();
2150                let vlsn_index_clone = Arc::clone(&vlsn_index);
2151                let shutdown_flag = self.io_shutdown.load(Ordering::SeqCst);
2152                // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
2153                // can call `bootstrap_via_dispatcher` automatically when
2154                // the master signals `NeedsRestore`.  When the env was
2155                // never registered with `init_self_weak` (raw
2156                // `Arc::new(Self::new(...))` without going through
2157                // `open()` or the test harness), the weak ref is `None`
2158                // and we fall back to operator-driven bootstrap.
2159                let self_weak: Option<Weak<Self>> =
2160                    self.self_weak.get().cloned();
2161
2162                // REP-7 (B): clone the live EnvironmentImpl into the replica
2163                // thread so the writer can drive a ReplicaReplay that applies
2164                // each streamed entry to the live in-memory tree.
2165                let env_for_replay = Arc::clone(&env);
2166
2167                // REP-10 (C): build the ReplicaReplay HERE (not inside the
2168                // closure) so we can publish its REP-7 `last_applied_vlsn`
2169                // handle to a ConsistencyTracker BEFORE the thread starts
2170                // streaming.  A read on this replica then waits on the same
2171                // handle the replay thread advances.  Port of
2172                // RepImpl.getConsistency / Replica.getConsistencyTracker.
2173                let replay = noxu_dbi::ReplicaReplay::new(env_for_replay);
2174                let tracker = crate::ConsistencyTracker::new(
2175                    replay.last_applied_vlsn_handle(),
2176                );
2177                *self.consistency_tracker.lock().unwrap() = Some(tracker);
2178
2179                let handle = std::thread::Builder::new()
2180                    .name(format!("noxu-replica-{}", node_name))
2181                    .spawn(move || {
2182                        // REP-7 (B): wire the live replay-apply path so reads
2183                        // on the replica see replicated data without a
2184                        // restart.  JE: the replica writes each entry to its
2185                        // log, then Replay.replayEntry applies it to the tree.
2186                        let mut writer = EnvironmentLogWriter::with_replay(
2187                            log_mgr,
2188                            vlsn_index_clone,
2189                            replay,
2190                        );
2191
2192                        let Some(addr) = master_addr_opt else {
2193                            log::warn!(
2194                                "noxu-replica-{}: master '{}' address not in RepGroup; \
2195                                 waiting for TCP dispatcher connection",
2196                                node_name, master,
2197                            );
2198                            return;
2199                        };
2200
2201                        // Catch-up loop: catch up, observe NeedsRestore,
2202                        // optionally auto-bootstrap, retry once.  We cap
2203                        // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
2204                        // (small) so a misbehaving master does not loop
2205                        // forever consuming network bandwidth.
2206                        const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
2207                        let mut attempts: u32 = 0;
2208                        loop {
2209                            log::info!(
2210                                "noxu-replica-{}: connecting to master '{}' at {}",
2211                                node_name, master, addr,
2212                            );
2213                            match crate::stream::peer_feeder::catch_up_from_peer(
2214                                addr, 0, &mut writer,
2215                            ) {
2216                                Ok(true) => {
2217                                    log::info!(
2218                                        "noxu-replica-{}: catch-up complete from '{}'",
2219                                        node_name, master,
2220                                    );
2221                                    return;
2222                                }
2223                                Ok(false) => {
2224                                    // F2/F4: master signals NeedsRestore.
2225                                    // Wave 9-A fix 2: if a Weak<Self> was
2226                                    // plumbed in, upgrade it and call
2227                                    // `bootstrap_via_dispatcher` ourselves
2228                                    // so the replica auto-bootstraps and
2229                                    // resumes catch-up without operator
2230                                    // intervention.
2231                                    log::warn!(
2232                                        "noxu-replica-{}: master '{}' requires restore",
2233                                        node_name, master,
2234                                    );
2235                                    attempts += 1;
2236                                    if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
2237                                        log::error!(
2238                                            "noxu-replica-{}: exceeded \
2239                                             auto-bootstrap attempts ({}); giving up",
2240                                            node_name,
2241                                            MAX_AUTO_BOOTSTRAP_ATTEMPTS,
2242                                        );
2243                                        return;
2244                                    }
2245                                    let env_arc = match self_weak
2246                                        .as_ref()
2247                                        .and_then(Weak::upgrade)
2248                                    {
2249                                        Some(e) => e,
2250                                        None => {
2251                                            // No back-ref or env dropped:
2252                                            // fall back to operator-driven
2253                                            // bootstrap and exit cleanly.
2254                                            log::warn!(
2255                                                "noxu-replica-{}: no back-reference \
2256                                                 available; operator must call \
2257                                                 bootstrap_via_dispatcher manually",
2258                                                node_name,
2259                                            );
2260                                            return;
2261                                        }
2262                                    };
2263                                    if env_arc.is_shutdown() {
2264                                        return;
2265                                    }
2266                                    log::info!(
2267                                        "noxu-replica-{}: auto-bootstrapping via \
2268                                         dispatcher from '{}' (attempt {})",
2269                                        node_name, master, attempts,
2270                                    );
2271                                    match env_arc
2272                                        .bootstrap_via_dispatcher(&master)
2273                                    {
2274                                        Ok(()) => {
2275                                            log::info!(
2276                                                "noxu-replica-{}: auto-bootstrap \
2277                                                 succeeded; resuming catch-up",
2278                                                node_name,
2279                                            );
2280                                            // Drop the strong ref before
2281                                            // re-entering catch-up so we
2282                                            // do not keep the env alive
2283                                            // longer than necessary.
2284                                            drop(env_arc);
2285                                            continue;
2286                                        }
2287                                        Err(e) => {
2288                                            log::error!(
2289                                                "noxu-replica-{}: auto-bootstrap \
2290                                                 failed: {}",
2291                                                node_name, e,
2292                                            );
2293                                            return;
2294                                        }
2295                                    }
2296                                }
2297                                Err(e) => {
2298                                    if !shutdown_flag {
2299                                        log::error!(
2300                                            "noxu-replica-{}: error from master '{}': {e}",
2301                                            node_name, master,
2302                                        );
2303                                    }
2304                                    return;
2305                                }
2306                            }
2307                        }
2308                    })
2309                    .expect("failed to spawn noxu-replica thread");
2310
2311                self.io_threads.lock().unwrap().push(handle);
2312
2313                log::debug!(
2314                    "Node '{}': replica receive thread started for master '{}'",
2315                    self.config.node_name.as_str(),
2316                    master_name,
2317                );
2318            } else {
2319                log::warn!(
2320                    "Node '{}': no LogManager available (read-only env?); \
2321                     replica I/O loop not started",
2322                    self.config.node_name.as_str(),
2323                );
2324            }
2325        }
2326        // -------------------------------------------------------------------
2327
2328        // Notify listeners
2329        self.notify_listeners(old_state, NodeState::Replica);
2330
2331        log::info!(
2332            "Node '{}' became replica of master '{}'",
2333            self.config.node_name.as_str(),
2334            master_name
2335        );
2336        Ok(())
2337    }
2338
2339    /// Initiate a master transfer to the target node.
2340    ///
2341    ///
2342    ///
2343    /// Transfers the current master state from this node to one of the
2344    /// electable replicas. The replica that is actually chosen to be the new
2345    /// master is the one with which the Master Transfer can be completed most
2346    /// rapidly. The transfer operation ensures that all changes at this node
2347    /// are available at the new master upon conclusion of the operation.
2348    pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
2349        if self.is_shutdown() {
2350            return Err(RepError::StateError(
2351                "Cannot transfer master: environment is closed".to_string(),
2352            ));
2353        }
2354
2355        if !self.is_master() {
2356            return Err(RepError::InvalidState(
2357                "Master transfer can only be initiated on the master node"
2358                    .to_string(),
2359            ));
2360        }
2361
2362        log::info!(
2363            "Node '{}' initiating master transfer to '{}'",
2364            self.config.node_name.as_str(),
2365            config.target_node,
2366        );
2367
2368        // Closes finding F7 of the 2026 review.
2369        //
2370        // Steps:
2371        //   1. Locate the target's address.
2372        //   2. Compute the new term (current observed term + 1).
2373        //   3. Send TRANSFER_MASTER to the target — it will become master.
2374        //   4. Send TRANSFER_MASTER (with the same term + new master name) to
2375        //      every other peer so they re-target.
2376        //   5. Demote self to Replica of the target.
2377        //
2378        // The transfer is best-effort: a peer that doesn't ack is logged
2379        // and skipped.  The election driver will reconcile any divergence
2380        // on the next election round.
2381
2382        let target_addr = self
2383            .group_service
2384            .get_all_nodes()
2385            .into_iter()
2386            .find(|n| n.name == config.target_node)
2387            .and_then(|n| {
2388                format!("{}:{}", n.host, n.port)
2389                    .parse::<std::net::SocketAddr>()
2390                    .ok()
2391            })
2392            .ok_or_else(|| {
2393                RepError::ConfigError(format!(
2394                    "transfer_master: target '{}' not registered or has bad address",
2395                    config.target_node
2396                ))
2397            })?;
2398
2399        let new_term = self.master_tracker.get_term().saturating_add(1);
2400
2401        // 1. Tell the target to become master at the new term.
2402        let target_ack = crate::group_admin::send_transfer_master(
2403            target_addr,
2404            &config.target_node,
2405            new_term,
2406        )
2407        .map_err(|e| {
2408            RepError::NetworkError(format!(
2409                "transfer_master: failed to signal target '{}': {}",
2410                config.target_node, e
2411            ))
2412        })?;
2413        if !target_ack {
2414            return Err(RepError::StateError(format!(
2415                "transfer_master: target '{}' rejected the transfer",
2416                config.target_node
2417            )));
2418        }
2419
2420        // 2. Inform all other peers (best-effort).
2421        for peer in self.group_service.get_all_nodes() {
2422            if peer.name == self.config.node_name
2423                || peer.name == config.target_node
2424            {
2425                continue;
2426            }
2427            if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
2428                let _ = crate::group_admin::send_transfer_master(
2429                    addr,
2430                    &config.target_node,
2431                    new_term,
2432                );
2433            }
2434        }
2435
2436        // 3. Demote self to Replica of the new master.
2437        self.become_replica(&config.target_node)?;
2438
2439        log::info!(
2440            "Node '{}' transferred master to '{}' at term {}",
2441            self.config.node_name.as_str(),
2442            config.target_node,
2443            new_term,
2444        );
2445        Ok(())
2446    }
2447
2448    /// Register a VLSN (as master, after writing a log entry).
2449    ///
2450    /// Maps the given VLSN to the specified log file position. This is called
2451    /// by the master after it writes a replicated log entry.
2452    pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
2453        self.vlsn_index.register(vlsn, file_number, file_offset);
2454    }
2455
2456    /// Register a VLSN→LSN mapping with its `LogEntryType`, so `lastSync` /
2457    /// `lastTxnEnd` advance (JE `VLSNRange.getUpdateForNewMapping`). Used by
2458    /// the syncup driver/tests that apply VLSN-tagged entries to a real log
2459    /// and need the sync/commit boundaries to track the stream.
2460    pub fn register_vlsn_typed(
2461        &self,
2462        vlsn: u64,
2463        file_number: u32,
2464        file_offset: u32,
2465        entry_type: noxu_log::LogEntryType,
2466    ) {
2467        self.vlsn_index.register_with_type(
2468            vlsn,
2469            file_number,
2470            file_offset,
2471            entry_type,
2472        );
2473    }
2474
2475    /// Replicate a freshly committed log entry from the master.
2476    ///
2477    /// Closes finding F9 of the 2026 review.
2478    ///
2479    /// Combines `register_vlsn` with a push into the in-memory
2480    /// `peer_scanner` so that downstream replicas pulling from this
2481    /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
2482    /// stream the entry without round-tripping through the on-disk
2483    /// log.  The local log is still the source of truth; the peer
2484    /// scanner is a fast-path cache that bounds itself via
2485    /// `PeerLogScanner::with_capacity` so old entries are evicted.
2486    ///
2487    /// Should be called by the master after the local commit has
2488    /// fsynced.  Calling on a non-master is harmless (the peer
2489    /// scanner cache is also used by replicas) but is logged at trace
2490    /// level for diagnostics.
2491    pub fn replicate_entry(
2492        &self,
2493        vlsn: u64,
2494        file_number: u32,
2495        file_offset: u32,
2496        entry_type: u8,
2497        data: Vec<u8>,
2498    ) {
2499        // Register VLSN -> LSN, dispatching entry type so lastSync /
2500        // lastTxnEnd advance (REP-5; JE VLSNRange.getUpdateForNewMapping).
2501        // An unknown type byte falls back to extend-only registration.
2502        match noxu_log::LogEntryType::from_type_num(entry_type) {
2503            Some(et) => self.vlsn_index.register_with_type(
2504                vlsn,
2505                file_number,
2506                file_offset,
2507                et,
2508            ),
2509            None => self.vlsn_index.register(vlsn, file_number, file_offset),
2510        }
2511        // Pull path: shared peer_scanner serves replicas connecting via
2512        // PeerFeederService (catch_up_from_peer).
2513        self.peer_scanner.push(vlsn, entry_type, data.clone());
2514        // Push path (C-C2): fan out to per-replica FeederRunner queues so
2515        // that threads spawned by become_master can stream entries to each
2516        // registered replica without competing with PeerFeederService.
2517        {
2518            let queues = self.feeder_queues.read().unwrap();
2519            for queue in queues.values() {
2520                queue.push(vlsn, entry_type, data.clone());
2521            }
2522        }
2523        if !self.is_master() {
2524            log::trace!(
2525                "replicate_entry called on non-master node '{}': vlsn={}, type={}",
2526                self.config.node_name,
2527                vlsn,
2528                entry_type,
2529            );
2530        }
2531    }
2532
2533    /// Apply a replicated entry (as replica).
2534    ///
2535    /// Applies a log entry received from the master. This is called by the
2536    /// replica stream handler after receiving an entry from the feeder.
2537    ///
2538    /// `data` is the wire-encoded log-record payload.  When the
2539    /// replicated environment has not been wired to a local
2540    /// `noxu_db::Environment` (i.e., before `with_environment` is
2541    /// called) the payload is forwarded into the in-memory peer
2542    /// scanner so that downstream replicas attached to the
2543    /// `PEER_FEEDER` service can re-stream it; the local log is **not**
2544    /// updated.  This is documented behaviour rather than a stub — see
2545    /// the 2026 review finding #26 (medium) for the
2546    /// `with_environment`-required local-apply path.
2547    /// cleanup (rep info F35: `_data` placeholder) renames the leading
2548    /// underscore so reviewers don't read it as a TODO.
2549    pub fn apply_entry(
2550        &self,
2551        vlsn: u64,
2552        entry_type: u8,
2553        data: Vec<u8>,
2554    ) -> Result<()> {
2555        if self.is_shutdown() {
2556            return Err(RepError::StateError(
2557                "Cannot apply entry: environment is closed".to_string(),
2558            ));
2559        }
2560
2561        // Register the VLSN in the index, dispatching entry type so
2562        // lastSync/lastTxnEnd advance (REP-5; JE
2563        // VLSNRange.getUpdateForNewMapping).
2564        match noxu_log::LogEntryType::from_type_num(entry_type) {
2565            Some(et) => self.vlsn_index.register_with_type(vlsn, 0, 0, et),
2566            None => self.vlsn_index.register(vlsn, 0, 0),
2567        }
2568
2569        // Push into the peer log scanner so downstream replicas can
2570        // receive this entry via the PEER_FEEDER service.
2571        self.peer_scanner.push(vlsn, entry_type, data.clone());
2572        // C-C2 push path: fan out to per-replica FeederRunner queues.
2573        {
2574            let queues = self.feeder_queues.read().unwrap();
2575            for queue in queues.values() {
2576                queue.push(vlsn, entry_type, data.clone());
2577            }
2578        }
2579
2580        log::trace!(
2581            "Applied replicated entry: vlsn={}, type={}",
2582            vlsn,
2583            entry_type
2584        );
2585        Ok(())
2586    }
2587
2588    /// Record an ack from a replica (as master).
2589    ///
2590    /// Records that the specified replica has acknowledged processing up to
2591    /// the given VLSN. This is used by the master to track durability
2592    /// guarantees.
2593    pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
2594        // Only acks from ELECTABLE replicas count toward the durability
2595        // quorum (JE DurabilityQuorum.replicaAcksQualify: Monitors and
2596        // Secondaries do not qualify). An ack from a non-electable / unknown
2597        // node is recorded for stats elsewhere but must not satisfy the
2598        // ReplicaAckPolicy. If the node is unknown to the group view we err
2599        // toward NOT counting it.
2600        let qualifies = self
2601            .get_rep_group()
2602            .get_node(replica_name)
2603            .map(|n| n.node_type().is_electable())
2604            .unwrap_or(false);
2605        if qualifies {
2606            self.ack_tracker.record_ack(vlsn, replica_name);
2607        }
2608        // REP-9 Part 1: advance the matching `Feeder::acked_vlsn` high-water
2609        // mark (read by `update_dtvlsn_from_feeders` and exposed via
2610        // `get_acked_vlsn`).  The production `FeederRunner` previously updated
2611        // only its private `known_replica_vlsn`, so the DTVLSN ranking never
2612        // saw production progress (JE `Feeder.getReplicaTxnEndVLSN`).  We
2613        // record the high-water for *any* replica (electable or not); the
2614        // electable filter is reapplied when DTVLSN/quorum is computed.
2615        for feeder in self.feeders.read().iter() {
2616            if feeder.get_replica_name() == replica_name {
2617                feeder.record_ack(vlsn);
2618                break;
2619            }
2620        }
2621        // Recompute the DTVLSN from feeder progress whenever an ack lands.
2622        self.update_dtvlsn_from_feeders();
2623        // REP-9: wake any committer parked in `await_replica_acks`. Its
2624        // satisfaction predicate is the high-water feeder count, not an
2625        // exact-VLSN registration, so we must notify unconditionally (the
2626        // AckTracker's own `record_ack` only notifies when the exact VLSN was
2627        // registered, which the per-frame feeder acks generally are not).
2628        self.ack_tracker.notify_waiters();
2629    }
2630
2631    /// Returns the current Durable Transaction VLSN (D7, JE RepNode.getDTVLSN).
2632    /// The highest VLSN replicated to a majority of electable replicas; 0 if
2633    /// none yet. Used by the election ranking so the most-durable node wins.
2634    pub fn get_dtvlsn(&self) -> u64 {
2635        self.dtvlsn.load(std::sync::atomic::Ordering::Acquire)
2636    }
2637
2638    /// Advance the DTVLSN to `candidate` if it is greater (JE
2639    /// RepNode.updateDTVLSN — an `AtomicLongMax.updateMax`). The DTVLSN can
2640    /// only move forward. Returns the resulting (possibly unchanged) value.
2641    pub fn update_dtvlsn(&self, candidate: u64) -> u64 {
2642        use std::sync::atomic::Ordering;
2643        let mut cur = self.dtvlsn.load(Ordering::Acquire);
2644        while candidate > cur {
2645            match self.dtvlsn.compare_exchange_weak(
2646                cur,
2647                candidate,
2648                Ordering::AcqRel,
2649                Ordering::Acquire,
2650            ) {
2651                Ok(_) => return candidate,
2652                Err(observed) => cur = observed,
2653            }
2654        }
2655        cur
2656    }
2657
2658    /// Set the DTVLSN from the replication stream (JE RepNode.setDTVLSN —
2659    /// used exclusively by the replica, which maintains the DTVLSN from
2660    /// commit/abort records). Still enforced as advance-only via update_max so
2661    /// an out-of-order or stale record cannot move it backward.
2662    pub fn set_dtvlsn(&self, vlsn: u64) {
2663        self.update_dtvlsn(vlsn);
2664    }
2665
2666    /// Master-side DTVLSN computation (D7, JE FeederManager.updateDTVLSN):
2667    /// across the *qualifying* (electable) feeders whose replica-txn-end VLSN
2668    /// exceeds the current DTVLSN, take the minimum; once a SIMPLE_MAJORITY
2669    /// ack-count of them exceeds the current value, advance the DTVLSN to that
2670    /// minimum (a transaction is durable once a majority hold it).
2671    fn update_dtvlsn_from_feeders(&self) {
2672        if !self.is_master() {
2673            return;
2674        }
2675        let curr = self.get_dtvlsn();
2676
2677        // SIMPLE_MAJORITY required-ack-count over the electable group,
2678        // computed the same way as await_replica_acks.
2679        let group = self.get_rep_group();
2680        let electable_peers: u32 = group
2681            .get_nodes()
2682            .iter()
2683            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2684            .count() as u32;
2685        let electable_count = electable_peers + 1; // +1 for self/master
2686        // required electable acks for SIMPLE_MAJORITY = floor(n/2) replicas
2687        // (the master self-acks; a majority is reached when this many peers
2688        // also hold the VLSN).
2689        let durable_ack_count = electable_count / 2;
2690        if durable_ack_count == 0 {
2691            // Single-node (or majority is self alone): the master's own log is
2692            // immediately durable up to its latest VLSN.
2693            self.update_dtvlsn(self.get_current_vlsn());
2694            return;
2695        }
2696
2697        let mut min = u64::MAX;
2698        let mut ack_count: u32 = 0;
2699        for feeder in self.feeders.read().iter() {
2700            // replicaAcksQualify: only electable feeders count (D6).
2701            let qualifies = group
2702                .get_node(&feeder.get_replica_name())
2703                .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2704                .unwrap_or(false);
2705            if !qualifies {
2706                continue;
2707            }
2708            let replica_vlsn = feeder.get_acked_vlsn();
2709            if replica_vlsn <= curr {
2710                continue;
2711            }
2712            if replica_vlsn < min {
2713                min = replica_vlsn;
2714            }
2715            ack_count += 1;
2716            if ack_count >= durable_ack_count {
2717                // A majority of electable replicas hold >= min: durable.
2718                self.update_dtvlsn(min);
2719                return;
2720            }
2721        }
2722        // DTVLSN unchanged.
2723    }
2724
2725    /// REP-9: count qualifying (electable) feeders whose acked high-water VLSN
2726    /// is `>= commit_vlsn`.  This is the Rust equivalent of JE
2727    /// `FeederManager.getNumCurrentAckFeeders(commitVLSN)` — the durability
2728    /// quorum is satisfied when this count reaches the required ack count.
2729    /// Only Electable replicas qualify (D6, JE
2730    /// `DurabilityQuorum.replicaAcksQualify`).
2731    fn count_ack_feeders_ge(&self, commit_vlsn: u64) -> u32 {
2732        let group = self.get_rep_group();
2733        let mut count = 0u32;
2734        for feeder in self.feeders.read().iter() {
2735            let qualifies = group
2736                .get_node(&feeder.get_replica_name())
2737                .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2738                .unwrap_or(false);
2739            // A feeder counts only if it has acked a *real* VLSN at or above
2740            // the commit VLSN.  `acked_vlsn == 0` is the NULL sentinel (no ack
2741            // yet) and must never satisfy a commit, even when `commit_vlsn`
2742            // itself is 0 (no replicated commit logged) — mirrors JE
2743            // `getReplicaTxnEndVLSN()` returning NULL_VLSN for a fresh feeder,
2744            // which is not `>=` any commit VLSN.
2745            let acked = feeder.get_acked_vlsn();
2746            if qualifies && acked > 0 && acked >= commit_vlsn {
2747                count += 1;
2748            }
2749        }
2750        count
2751    }
2752
2753    /// Set the state change listener.
2754    ///
2755    ///
2756    ///
2757    /// Sets the listener used to receive asynchronous replication node state
2758    /// change events. Note that there is one listener per replication node,
2759    /// not one per handle. Invoking this method adds to the set of listeners.
2760    ///
2761    /// Invoking this method typically results in an immediate callback to the
2762    /// application via the `on_state_change` method, so that the application
2763    /// is made aware of the existing state of the node at the time the listener
2764    /// is first established.
2765    pub fn set_state_change_listener(
2766        &self,
2767        listener: Arc<dyn StateChangeListener>,
2768    ) {
2769        // Immediately notify the listener of the current state
2770        let current_state = self.node_state.get_state();
2771        let event = StateChangeEvent::new(
2772            current_state,
2773            current_state,
2774            self.get_master_name(),
2775        );
2776        listener.on_state_change(event);
2777
2778        let mut listeners = self.listeners.write();
2779        listeners.push(listener);
2780    }
2781
2782    /// Close the replicated environment.
2783    ///
2784    ///
2785    ///
2786    /// Closes this handle and releases any resources. When closed, daemon
2787    /// threads are stopped, even if they are performing work. The node ceases
2788    /// participation in the replication group. If the node was currently the
2789    /// master, the rest of the group will hold an election.
2790    ///
2791    /// The ReplicatedEnvironment should not be closed while any other type of
2792    /// handle that refers to it is not yet closed.
2793    pub fn close(&self) -> Result<()> {
2794        if self.shutdown.swap(true, Ordering::SeqCst) {
2795            // Already closed
2796            return Ok(());
2797        }
2798
2799        let old_state = self.node_state.get_state();
2800
2801        // Transition to Shutdown state. The state machine allows this from
2802        // any non-Shutdown state.
2803        let _ = self.node_state.transition_to(NodeState::Shutdown);
2804
2805        // Notify listeners of the shutdown
2806        self.notify_listeners(old_state, NodeState::Shutdown);
2807
2808        // Clear feeders
2809        {
2810            let mut feeders = self.feeders.write();
2811            feeders.clear();
2812        }
2813
2814        // C-C2: close all registered feeder channels so FeederRunner threads
2815        // observe ChannelClosed and exit their run() loops cleanly.
2816        {
2817            let channels = self.feeder_channels.lock().unwrap();
2818            for (name, ch) in channels.iter() {
2819                if let Err(e) = ch.close() {
2820                    log::debug!(
2821                        "close: feeder channel for '{}' already closed: {}",
2822                        name,
2823                        e
2824                    );
2825                }
2826            }
2827        }
2828        // Drop all active runners and queues so their Arcs release.
2829        self.active_feeder_runners.lock().unwrap().clear();
2830        self.feeder_queues.write().unwrap().clear();
2831
2832        // Signal and join all I/O threads spawned by become_master /
2833        // become_replica / start_vlsn_persistence_daemon.  The vlsn-flush
2834        // thread does a final flush on its way out so a clean close is
2835        // recoverable.  Closes finding F11.
2836        self.io_shutdown.store(true, Ordering::SeqCst);
2837        {
2838            let mut threads = self.io_threads.lock().unwrap();
2839            for handle in threads.drain(..) {
2840                let _ = handle.join();
2841            }
2842        }
2843
2844        // Belt-and-braces: even when no daemon is running (e.g.
2845        // `ReplicatedEnvironment::new` without `open`), persist a final
2846        // snapshot if env_home is configured.
2847        if let Some(ref home) = self.config.env_home
2848            && let Err(e) =
2849                crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
2850        {
2851            log::warn!(
2852                "close: failed to persist VLSN index to {}: {}",
2853                home.display(),
2854                e
2855            );
2856        }
2857
2858        // Stop the service dispatcher (the: serviceDispatcher.shutdown()).
2859        if let Some(ref dispatcher) = self.tcp_dispatcher {
2860            dispatcher.stop();
2861            let kind = if dispatcher.is_tls() { "TLS" } else { "TCP" };
2862            log::debug!(
2863                "Node '{}' {} service dispatcher stopped",
2864                self.config.node_name.as_str(),
2865                kind,
2866            );
2867        }
2868
2869        log::info!(
2870            "Replicated environment '{}' in group '{}' closed",
2871            self.config.node_name.as_str(),
2872            self.config.group_name.as_str()
2873        );
2874
2875        Ok(())
2876    }
2877
2878    /// Close this handle and shut down the Replication Group by forcing all
2879    /// active Replicas to exit.
2880    ///
2881    ///
2882    ///
2883    /// This method must be invoked on the node that's currently the Master
2884    /// after all other outstanding handles have been closed.
2885    ///
2886    /// When push-feeder threads are active (registered via
2887    /// [`Self::register_feeder_channel`]), the master first waits up to half
2888    /// of `replica_shutdown_timeout_ms` for each FeederRunner replica to
2889    /// acknowledge all outstanding log entries (VLSN catch-up).  Replicas
2890    /// that do not catch up within the budget receive a warning; the master
2891    /// proceeds to send `SHUTDOWN_GROUP` regardless.  This closes finding M-4
2892    /// of the v3.x production-readiness review.
2893    ///
2894    /// Replicas that are not fed via a registered channel (pull-based
2895    /// `PeerFeederService` path) are sent `SHUTDOWN_GROUP` without a
2896    /// VLSN-level catch-up wait — that wait requires per-replica ack tracking
2897    /// which the pull path does not yet provide.
2898    pub fn shutdown_group(
2899        &self,
2900        replica_shutdown_timeout_ms: u64,
2901    ) -> Result<()> {
2902        if !self.is_master() {
2903            return Err(RepError::InvalidState(
2904                "shutdownGroup must be invoked on the master".to_string(),
2905            ));
2906        }
2907
2908        log::info!(
2909            "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
2910            self.config.node_name.as_str(),
2911            self.config.group_name.as_str(),
2912            replica_shutdown_timeout_ms,
2913        );
2914
2915        // M-4: Wait for active FeederRunner replicas to ack the master's
2916        // current VLSN before sending SHUTDOWN_GROUP.  We allow up to half
2917        // the overall timeout for the catch-up phase so the second half
2918        // remains for the SHUTDOWN_GROUP send loop.
2919        let catchup_budget_ms = replica_shutdown_timeout_ms / 2;
2920        if catchup_budget_ms > 0 {
2921            let master_vlsn = self.vlsn_index.get_range().last();
2922            if master_vlsn > 0 {
2923                let runners: Vec<(String, Arc<FeederRunner>)> = self
2924                    .active_feeder_runners
2925                    .lock()
2926                    .unwrap()
2927                    .iter()
2928                    .map(|(k, v)| (k.clone(), Arc::clone(v)))
2929                    .collect();
2930                if !runners.is_empty() {
2931                    let catchup_deadline = std::time::Instant::now()
2932                        + Duration::from_millis(catchup_budget_ms);
2933                    for (name, runner) in &runners {
2934                        loop {
2935                            let acked = runner.known_replica_vlsn();
2936                            if acked >= master_vlsn
2937                                || std::time::Instant::now() >= catchup_deadline
2938                            {
2939                                if acked < master_vlsn {
2940                                    log::warn!(
2941                                        "shutdown_group: replica '{}' acked \
2942                                         VLSN {} < master VLSN {}; proceeding",
2943                                        name,
2944                                        acked,
2945                                        master_vlsn,
2946                                    );
2947                                } else {
2948                                    log::info!(
2949                                        "shutdown_group: replica '{}' caught up \
2950                                         to VLSN {}",
2951                                        name,
2952                                        acked,
2953                                    );
2954                                }
2955                                break;
2956                            }
2957                            std::thread::sleep(Duration::from_millis(10));
2958                        }
2959                    }
2960                }
2961            }
2962        }
2963
2964        // Closes finding F8 of the 2026 review.
2965        //
2966        // Send SHUTDOWN_GROUP to every known peer.  The recipient calls
2967        // its own `close()` and the per-connection ADMIN handler
2968        // returns ACK_OK.  Any peer that doesn't ack within the
2969        // timeout is logged and the master proceeds.  After signalling
2970        // every peer, the master closes its own env.
2971        let deadline = std::time::Instant::now()
2972            + Duration::from_millis(replica_shutdown_timeout_ms);
2973
2974        for peer in self.group_service.get_all_nodes() {
2975            if peer.name == self.config.node_name {
2976                continue;
2977            }
2978            // Don't exceed the deadline waiting for any single peer.
2979            let now = std::time::Instant::now();
2980            if now >= deadline {
2981                log::warn!(
2982                    "shutdown_group: deadline reached; skipping remaining peers"
2983                );
2984                break;
2985            }
2986            let addr_str = format!("{}:{}", peer.host, peer.port);
2987            let addr = match addr_str.parse::<SocketAddr>() {
2988                Ok(a) => a,
2989                Err(e) => {
2990                    log::warn!(
2991                        "shutdown_group: peer '{}' has bad address {}: {}",
2992                        peer.name,
2993                        addr_str,
2994                        e
2995                    );
2996                    continue;
2997                }
2998            };
2999            match crate::group_admin::send_shutdown_group(addr) {
3000                Ok(true) => log::info!(
3001                    "shutdown_group: peer '{}' acknowledged",
3002                    peer.name
3003                ),
3004                Ok(false) => log::warn!(
3005                    "shutdown_group: peer '{}' rejected the request",
3006                    peer.name
3007                ),
3008                Err(e) => log::warn!(
3009                    "shutdown_group: peer '{}' unreachable: {}",
3010                    peer.name,
3011                    e
3012                ),
3013            }
3014        }
3015
3016        // Master closes itself last.
3017        self.close()
3018    }
3019
3020    /// Check if shutdown is in progress.
3021    pub fn is_shutdown(&self) -> bool {
3022        self.shutdown.load(Ordering::SeqCst)
3023    }
3024
3025    /// Notify all registered listeners of a state change.
3026    fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
3027        let listeners = self.listeners.read();
3028        if !listeners.is_empty() {
3029            let event = StateChangeEvent::new(
3030                old_state,
3031                new_state,
3032                self.get_master_name(),
3033            );
3034            for listener in listeners.iter() {
3035                listener.on_state_change(event.clone());
3036            }
3037        }
3038    }
3039}
3040
3041// ---------------------------------------------------------------------------
3042// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
3043// ---------------------------------------------------------------------------
3044//
3045// `noxu_db::Transaction::commit_with_durability` calls
3046// `await_replica_acks` after the local WAL fsync.  This impl:
3047//
3048//   1. Rejects calls on a non-master node with `NotMaster`.
3049//   2. Rejects calls during shutdown with `Shutdown`.
3050//   3. Computes the required ack count from `electable_count` and the
3051//      requested policy.
3052//   4. Allocates a unique commit sequence number, registers the ack
3053//      requirement on the `AckTracker`, and polls `is_satisfied` with
3054//      a small sleep until either the timeout elapses or the policy
3055//      is satisfied.
3056//   5. Cleans up the tracker entry on every exit path.
3057//
3058// Closes finding F1 of the 2026 review.
3059impl ReplicaAckCoordinator for ReplicatedEnvironment {
3060    fn await_replica_acks(
3061        &self,
3062        policy: ReplicaAckPolicyKind,
3063        timeout: Duration,
3064    ) -> std::result::Result<u32, AckWaitError> {
3065        // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
3066        // says callers may already short-circuit, but be defensive.
3067        if matches!(policy, ReplicaAckPolicyKind::None) {
3068            return Ok(0);
3069        }
3070
3071        if self.is_shutdown() {
3072            return Err(AckWaitError {
3073                kind: AckWaitErrorKind::Shutdown,
3074                needed: 0,
3075                received: 0,
3076            });
3077        }
3078
3079        if !self.is_master() {
3080            return Err(AckWaitError {
3081                kind: AckWaitErrorKind::NotMaster,
3082                needed: 0,
3083                received: 0,
3084            });
3085        }
3086
3087        // Count electable peers (excluding the master) using the
3088        // RepGroup view, which counts Arbiters and Electables
3089        // identically. Only Electable nodes are counted as data
3090        // replicas able to ack a commit.  The master itself is
3091        // *implicit*: it is not registered in `group_service` (only
3092        // peers are), so we add 1 to obtain the total electable
3093        // count expected by `ReplicaAckPolicyKind::required_acks`.
3094        let group = self.get_rep_group();
3095        let electable_peers: u32 = group
3096            .get_nodes()
3097            .iter()
3098            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
3099            .count() as u32;
3100        let electable_count: u32 = electable_peers + 1; // +1 for self/master
3101
3102        let needed = policy.required_acks(electable_count);
3103        if needed == 0 {
3104            // Single-node group, or All with only the master itself.
3105            return Ok(0);
3106        }
3107
3108        // REP-9 Part 2: the commit's VLSN is the key.  The master assigns a
3109        // VLSN when it logs the TxnCommit (via the shared `wal_vlsn_counter`
3110        // bumped in `EnvironmentImpl::log_txn_commit`), immediately before
3111        // this gate runs.  The latest assigned VLSN therefore IS this
3112        // commit's VLSN (the trait contract: "implementations are responsible
3113        // for assigning the commit VLSN internally").  We wait until a quorum
3114        // of qualifying electable replicas have acked a VLSN >= the commit
3115        // VLSN — faithful to JE `FeederManager.getNumCurrentAckFeeders`, which
3116        // counts feeders whose `getReplicaTxnEndVLSN() >= commitVLSN` (a
3117        // high-water `>=` test, NOT an exact-VLSN match).
3118        //
3119        // ponytail: reads the global high-water VLSN, so a concurrent later
3120        // commit can make this gate wait on a slightly higher VLSN than its
3121        // own. That is strictly SAFE (waiting for >= a newer VLSN never
3122        // returns early) and only marginally less precise; thread the
3123        // per-txn VLSN through the trait if exact per-commit granularity is
3124        // ever needed.
3125        let commit_vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
3126
3127        // Register on the AckTracker too: this is what `record_ack` notifies,
3128        // so the condvar wakes us as acks land.  The satisfaction decision
3129        // itself is the high-water feeder count below.
3130        self.ack_tracker.register(commit_vlsn, needed);
3131
3132        // Block on the ack condvar until a quorum of electable feeders hold
3133        // the commit VLSN, the timeout elapses, or shutdown is signalled — no
3134        // spin-poll (JE FeederTxns.TxnInfo uses a per-transaction
3135        // CountDownLatch.await; the AckTracker condvar is the shared-mutex
3136        // equivalent). record_ack notifies us as acks arrive.
3137        let satisfied = self.ack_tracker.wait_for_predicate(
3138            timeout,
3139            || self.count_ack_feeders_ge(commit_vlsn) >= needed,
3140            || self.is_shutdown(),
3141        );
3142        if satisfied {
3143            self.ack_tracker.cleanup_through(commit_vlsn);
3144            return Ok(needed);
3145        }
3146        if self.is_shutdown() {
3147            self.ack_tracker.cleanup_through(commit_vlsn);
3148            return Err(AckWaitError {
3149                kind: AckWaitErrorKind::Shutdown,
3150                needed,
3151                received: 0,
3152            });
3153        }
3154        // Timed out: report the partial ack count (qualifying electable
3155        // feeders holding the commit VLSN) so the caller can surface
3156        // InsufficientReplicas.
3157        let received = self.count_ack_feeders_ge(commit_vlsn);
3158        self.ack_tracker.cleanup_through(commit_vlsn);
3159        Err(AckWaitError { kind: AckWaitErrorKind::Timeout, needed, received })
3160    }
3161
3162    /// X-3: allocate the next VLSN for a recovered XA commit and register
3163    /// `lsn` in the VLSN index so feeders can stream the commit.
3164    ///
3165    /// Increments off the current latest VLSN so the new VLSN is strictly
3166    /// monotonically increasing.  In a single-node or master-less environment
3167    /// (not master) returns 0 (NULL_VLSN — harmless, the default).
3168    fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
3169        // Only allocate a VLSN when we are the master; on a replica the
3170        // recovered XA should have been replicated by the original master.
3171        if !self.is_master() {
3172            return 0;
3173        }
3174        let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
3175        // A recovered XA commit is a commit log entry; dispatch as TxnCommit
3176        // so lastTxnEnd/lastSync advance (REP-5).
3177        self.vlsn_index.register_with_type(
3178            next_vlsn,
3179            lsn.file_number(),
3180            lsn.file_offset(),
3181            noxu_log::LogEntryType::TxnCommit,
3182        );
3183        log::debug!(
3184            "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
3185            next_vlsn,
3186            lsn
3187        );
3188        next_vlsn
3189    }
3190
3191    /// R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.
3192    ///
3193    /// The caller writes the `TxnCommit` WAL entry with this VLSN embedded,
3194    /// then calls `register_recovered_commit_vlsn` with the actual commit LSN.
3195    /// This two-step approach ensures the WAL entry carries the VLSN so the
3196    /// X-14 VLSN rebuild on second crash can find it.
3197    fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
3198        if !self.is_master() {
3199            return 0;
3200        }
3201        // Peek at the next VLSN without registering.  The actual registration
3202        // happens in register_recovered_commit_vlsn() after the WAL write.
3203        self.vlsn_index.get_latest_vlsn() + 1
3204    }
3205
3206    /// R-3: register a pre-allocated VLSN in the VLSN index with the actual
3207    /// commit LSN.  Called after writing the `TxnCommit` WAL entry.
3208    fn register_recovered_commit_vlsn(
3209        &self,
3210        vlsn: u64,
3211        commit_lsn: noxu_util::Lsn,
3212    ) {
3213        if vlsn == 0 || !self.is_master() {
3214            return;
3215        }
3216        // The pre-allocated VLSN is for a TxnCommit WAL entry; dispatch the
3217        // type so lastTxnEnd/lastSync advance (REP-5).
3218        self.vlsn_index.register_with_type(
3219            vlsn,
3220            commit_lsn.file_number(),
3221            commit_lsn.file_offset(),
3222            noxu_log::LogEntryType::TxnCommit,
3223        );
3224        log::debug!(
3225            "register_recovered_commit_vlsn: registered vlsn={} for commit_lsn={:?}",
3226            vlsn,
3227            commit_lsn
3228        );
3229    }
3230}
3231
3232#[cfg(test)]
3233mod tests {
3234    use super::*;
3235    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
3236
3237    /// Helper to create a test config with a fixed port (unit-test style,
3238    /// no real TCP bind needed — hostname "localhost" resolves but the port
3239    /// might be in use; use `test_config_port0` for real TCP tests).
3240    fn test_config(node_name: &str) -> RepConfig {
3241        RepConfig::builder("test_group", node_name, "localhost")
3242            .node_port(5001)
3243            .build()
3244    }
3245
3246    /// Helper to create a test config that binds to an OS-assigned port.
3247    fn test_config_port0(node_name: &str) -> RepConfig {
3248        RepConfig::builder("test_group", node_name, "127.0.0.1")
3249            .node_port(0)
3250            .build()
3251    }
3252
3253    #[test]
3254    fn test_initial_state_is_detached() {
3255        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3256        // NodeStateMachine starts in Detached state
3257        assert_eq!(env.get_state(), NodeState::Detached);
3258        assert!(!env.is_master());
3259        assert!(!env.is_replica());
3260        assert!(!env.is_active());
3261    }
3262
3263    #[test]
3264    fn test_become_master() {
3265        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3266        env.become_master(1).unwrap();
3267        assert_eq!(env.get_state(), NodeState::Master);
3268        assert!(env.is_master());
3269        assert!(!env.is_replica());
3270        assert!(env.is_active());
3271    }
3272
3273    #[test]
3274    fn test_become_replica() {
3275        let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
3276        env.become_replica("node1").unwrap();
3277        assert_eq!(env.get_state(), NodeState::Replica);
3278        assert!(!env.is_master());
3279        assert!(env.is_replica());
3280        assert!(env.is_active());
3281    }
3282
3283    #[test]
3284    fn test_get_node_name() {
3285        let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
3286        assert_eq!(env.get_node_name(), "my_node");
3287    }
3288
3289    #[test]
3290    fn test_get_group_name() {
3291        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3292        assert_eq!(env.get_group_name(), "test_group");
3293    }
3294
3295    #[test]
3296    fn test_register_vlsn_updates_index() {
3297        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3298        env.register_vlsn(1, 0, 100);
3299        env.register_vlsn(2, 0, 200);
3300        env.register_vlsn(3, 0, 300);
3301
3302        assert_eq!(env.get_current_vlsn(), 3);
3303        let range = env.get_vlsn_range();
3304        assert_eq!(range.first(), 1);
3305        assert_eq!(range.last(), 3);
3306    }
3307
3308    #[test]
3309    fn test_record_ack() {
3310        use crate::node_type::NodeType;
3311        use crate::rep_node::RepNode;
3312        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3313        env.become_master(1).unwrap();
3314        // replicaAcksQualify: only ELECTABLE replicas count toward durability,
3315        // so the replica must be a known electable member of the group.
3316        env.add_peer(RepNode::new(
3317            "replica1".to_string(),
3318            NodeType::Electable,
3319            "127.0.0.1".to_string(),
3320            6001,
3321            2,
3322        ))
3323        .unwrap();
3324
3325        env.register_vlsn(1, 0, 100);
3326        // Register a pending ack requirement, then record ack
3327        env.get_ack_tracker().register(1, 1);
3328        env.record_ack(1, "replica1");
3329        // Ack should be satisfied
3330        assert!(env.get_ack_tracker().is_satisfied(1));
3331    }
3332
3333    #[test]
3334    fn test_record_ack_from_non_electable_does_not_qualify() {
3335        use crate::node_type::NodeType;
3336        use crate::rep_node::RepNode;
3337        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3338        env.become_master(1).unwrap();
3339        // A Monitor is NOT electable -> its ack must not count (JE
3340        // DurabilityQuorum.replicaAcksQualify).
3341        env.add_peer(RepNode::new(
3342            "monitor1".to_string(),
3343            NodeType::Monitor,
3344            "127.0.0.1".to_string(),
3345            6002,
3346            3,
3347        ))
3348        .unwrap();
3349        env.register_vlsn(1, 0, 100);
3350        env.get_ack_tracker().register(1, 1);
3351        env.record_ack(1, "monitor1");
3352        assert!(
3353            !env.get_ack_tracker().is_satisfied(1),
3354            "non-electable ack must not satisfy durability quorum"
3355        );
3356        // An unknown replica likewise does not qualify.
3357        env.record_ack(1, "ghost");
3358        assert!(!env.get_ack_tracker().is_satisfied(1));
3359    }
3360
3361    #[test]
3362    fn test_authoritative_quorum_met() {
3363        // 1-node group (electable_total=1): master alone IS authoritative
3364        // (quorum_size = 1/2+1 = 1; 0 replicas + 1 >= 1).
3365        assert!(ReplicatedEnvironment::authoritative_quorum_met(0, 1));
3366        // 3-node group (electable_total=3, quorum_size=2): master with 0
3367        // connected replicas is the minority -> NOT authoritative.
3368        assert!(!ReplicatedEnvironment::authoritative_quorum_met(0, 3));
3369        // 3-node group with 1 connected electable replica -> 1+1=2 >= 2 -> yes.
3370        assert!(ReplicatedEnvironment::authoritative_quorum_met(1, 3));
3371        // 5-node group (quorum_size=3): need 2 connected replicas.
3372        assert!(!ReplicatedEnvironment::authoritative_quorum_met(1, 5));
3373        assert!(ReplicatedEnvironment::authoritative_quorum_met(2, 5));
3374    }
3375
3376    #[test]
3377    fn test_is_authoritative_master_requires_master_role() {
3378        // A non-master is never authoritative regardless of connections.
3379        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3380        assert!(!env.is_master());
3381        assert!(!env.is_authoritative_master());
3382        // A single-node master (no peers) IS authoritative.
3383        env.become_master(1).unwrap();
3384        assert!(env.is_authoritative_master());
3385    }
3386
3387    #[test]
3388    fn test_dtvlsn_update_max_advances_only() {
3389        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3390        assert_eq!(env.get_dtvlsn(), 0);
3391        assert_eq!(env.update_dtvlsn(10), 10);
3392        assert_eq!(env.get_dtvlsn(), 10);
3393        // A lower candidate must not move it backward.
3394        assert_eq!(env.update_dtvlsn(5), 10);
3395        assert_eq!(env.get_dtvlsn(), 10);
3396        // Equal is a no-op.
3397        assert_eq!(env.update_dtvlsn(10), 10);
3398        // set_dtvlsn (replica path) is also advance-only.
3399        env.set_dtvlsn(7);
3400        assert_eq!(env.get_dtvlsn(), 10);
3401        env.set_dtvlsn(20);
3402        assert_eq!(env.get_dtvlsn(), 20);
3403    }
3404
3405    #[test]
3406    fn test_dtvlsn_majority_min_across_feeders() {
3407        use crate::node_type::NodeType;
3408        use crate::rep_node::RepNode;
3409        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3410        env.become_master(1).unwrap();
3411        // Three electable replicas → electable_count = 4 (incl. master) →
3412        // durable_ack_count = 2. With master self-ack, DTVLSN advances to the
3413        // min of the 2 highest qualifying feeders that exceed the current
3414        // DTVLSN.
3415        for (i, name) in ["r1", "r2", "r3"].iter().enumerate() {
3416            env.add_peer(RepNode::new(
3417                name.to_string(),
3418                NodeType::Electable,
3419                "127.0.0.1".to_string(),
3420                6100 + i as u16,
3421                (i + 2) as u32,
3422            ))
3423            .unwrap();
3424        }
3425        // Register feeders with differing acked VLSNs: r1=100, r2=80, r3=50.
3426        for (name, vlsn) in [("r1", 100u64), ("r2", 80), ("r3", 50)] {
3427            let f = crate::stream::feeder::Feeder::new(name.to_string());
3428            f.record_ack(vlsn);
3429            env.feeders.write().push(f);
3430        }
3431        env.update_dtvlsn_from_feeders();
3432        // First two qualifying feeders encountered are r1(100), r2(80);
3433        // min(100,80)=80 and that is a majority (2 of 4) → DTVLSN = 80.
3434        // (r3=50 < 80 is not required for durability.)
3435        assert!(
3436            env.get_dtvlsn() >= 80,
3437            "DTVLSN must reach the majority-min (>=80), got {}",
3438            env.get_dtvlsn()
3439        );
3440    }
3441
3442    #[test]
3443    fn test_close_sets_shutdown() {
3444        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3445        assert!(!env.is_shutdown());
3446
3447        env.close().unwrap();
3448        assert!(env.is_shutdown());
3449        // After close, state should be Shutdown
3450        assert_eq!(env.get_state(), NodeState::Shutdown);
3451    }
3452
3453    #[test]
3454    fn test_close_is_idempotent() {
3455        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3456        env.close().unwrap();
3457        env.close().unwrap(); // Should not error
3458        assert!(env.is_shutdown());
3459    }
3460
3461    #[test]
3462    fn test_cannot_become_master_when_shutdown() {
3463        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3464        env.close().unwrap();
3465
3466        let result = env.become_master(1);
3467        assert!(result.is_err());
3468    }
3469
3470    #[test]
3471    fn test_cannot_become_replica_when_shutdown() {
3472        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3473        env.close().unwrap();
3474
3475        let result = env.become_replica("master");
3476        assert!(result.is_err());
3477    }
3478
3479    #[test]
3480    fn test_cannot_apply_entry_when_shutdown() {
3481        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3482        env.close().unwrap();
3483
3484        let result = env.apply_entry(1, 0, vec![1, 2, 3]);
3485        assert!(result.is_err());
3486    }
3487
3488    #[test]
3489    fn test_cannot_transfer_master_when_not_master() {
3490        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3491        env.become_replica("other").unwrap();
3492
3493        let config = MasterTransferConfig::new(
3494            "target_node".to_string(),
3495            Duration::from_secs(30),
3496        );
3497        let result = env.transfer_master(config);
3498        assert!(result.is_err());
3499    }
3500
3501    #[test]
3502    fn test_transfer_master_requires_registered_target() {
3503        // F7: transfer_master is no longer a no-op; it sends an ADMIN
3504        // TRANSFER_MASTER signal to the target via TCP.  An unregistered
3505        // target is rejected at the address-resolution step.
3506        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3507        env.become_master(1).unwrap();
3508
3509        let config = MasterTransferConfig::new(
3510            "unknown_target".to_string(),
3511            Duration::from_secs(30),
3512        );
3513        let result = env.transfer_master(config);
3514        assert!(
3515            result.is_err(),
3516            "transfer_master to unregistered target must error"
3517        );
3518    }
3519
3520    #[test]
3521    fn test_apply_entry_registers_vlsn() {
3522        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3523        env.become_replica("master").unwrap();
3524
3525        env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
3526        env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
3527
3528        assert_eq!(env.get_current_vlsn(), 2);
3529    }
3530
3531    #[test]
3532    fn test_master_name_tracking() {
3533        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3534
3535        // Initially no master known
3536        assert!(env.get_master_name().is_none());
3537
3538        // After becoming master, this node is the master
3539        env.become_master(1).unwrap();
3540        assert_eq!(env.get_master_name(), Some("node1".to_string()));
3541    }
3542
3543    #[test]
3544    fn test_master_to_replica_transition() {
3545        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3546
3547        // Become master first
3548        env.become_master(1).unwrap();
3549        assert_eq!(env.get_master_name(), Some("node1".to_string()));
3550
3551        // Transition to replica (Master -> Replica is valid)
3552        env.become_replica("other_master").unwrap();
3553        assert_eq!(env.get_master_name(), Some("other_master".to_string()));
3554        assert!(env.is_replica());
3555    }
3556
3557    #[test]
3558    fn test_state_change_listener_notification() {
3559        struct TestListener {
3560            call_count: AtomicU32,
3561            last_new_state: noxu_sync::Mutex<Option<NodeState>>,
3562        }
3563
3564        impl StateChangeListener for TestListener {
3565            fn on_state_change(&self, event: StateChangeEvent) {
3566                self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
3567                *self.last_new_state.lock() = Some(event.new_state);
3568            }
3569        }
3570
3571        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3572        let listener = Arc::new(TestListener {
3573            call_count: AtomicU32::new(0),
3574            last_new_state: noxu_sync::Mutex::new(None),
3575        });
3576
3577        // Setting the listener should trigger an immediate notification
3578        env.set_state_change_listener(listener.clone());
3579        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
3580
3581        // State change should trigger another notification
3582        env.become_master(1).unwrap();
3583        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
3584        assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
3585    }
3586
3587    #[test]
3588    fn test_close_notifies_listeners() {
3589        struct ShutdownListener {
3590            shutdown_seen: AtomicBool,
3591        }
3592
3593        impl StateChangeListener for ShutdownListener {
3594            fn on_state_change(&self, event: StateChangeEvent) {
3595                if event.new_state == NodeState::Shutdown {
3596                    self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
3597                }
3598            }
3599        }
3600
3601        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3602        let listener = Arc::new(ShutdownListener {
3603            shutdown_seen: AtomicBool::new(false),
3604        });
3605
3606        // The initial notification is for the current (Detached) state
3607        env.set_state_change_listener(listener.clone());
3608
3609        // Become master first so the close transition is meaningful
3610        env.become_master(1).unwrap();
3611        assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3612
3613        env.close().unwrap();
3614        assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3615    }
3616
3617    #[test]
3618    fn test_shutdown_group_requires_master() {
3619        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3620        env.become_replica("other").unwrap();
3621
3622        let result = env.shutdown_group(5000);
3623        assert!(result.is_err());
3624    }
3625
3626    #[test]
3627    fn test_shutdown_group_as_master() {
3628        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3629        env.become_master(1).unwrap();
3630
3631        let result = env.shutdown_group(5000);
3632        assert!(result.is_ok());
3633        assert!(env.is_shutdown());
3634    }
3635
3636    #[test]
3637    fn test_get_config() {
3638        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3639        assert_eq!(env.get_config().node_name, "node1");
3640        assert_eq!(env.get_config().group_name, "test_group");
3641    }
3642
3643    #[test]
3644    fn test_get_stats() {
3645        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3646        let _stats = env.get_stats();
3647        // Just verify we can access stats without panicking
3648    }
3649
3650    // -----------------------------------------------------------------------
3651    // TCP dispatcher tests (H-5 / H-7)
3652    // -----------------------------------------------------------------------
3653
3654    #[test]
3655    fn test_tcp_dispatcher_starts_on_new() {
3656        // Use port 0 so the OS assigns an ephemeral port.
3657        let env =
3658            ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
3659        // The dispatcher must have started and bound a real port.
3660        let addr = env.bound_addr();
3661        assert!(addr.is_some(), "expected a bound address");
3662        let addr = addr.unwrap();
3663        assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
3664    }
3665
3666    #[test]
3667    fn test_tcp_dispatcher_stops_on_close() {
3668        let env =
3669            ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
3670        // Dispatcher is running.
3671        assert!(
3672            env.tcp_dispatcher
3673                .as_ref()
3674                .map(|d| d.is_running())
3675                .unwrap_or(false)
3676        );
3677
3678        env.close().unwrap();
3679
3680        // After close, dispatcher must be stopped.
3681        assert!(
3682            !env.tcp_dispatcher
3683                .as_ref()
3684                .map(|d| d.is_running())
3685                .unwrap_or(false),
3686            "dispatcher should be stopped after close"
3687        );
3688    }
3689
3690    #[test]
3691    fn test_tcp_dispatcher_accepts_connection() {
3692        use crate::net::Channel;
3693        use crate::net::ServiceHandler;
3694        use crate::net::service_dispatcher::connect_to_service;
3695        use std::sync::atomic::{AtomicU32, Ordering as AO};
3696        use std::time::Duration;
3697
3698        struct PingHandler {
3699            count: AtomicU32,
3700        }
3701        impl ServiceHandler for PingHandler {
3702            fn service_name(&self) -> &str {
3703                "ping"
3704            }
3705            fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
3706                self.count.fetch_add(1, AO::SeqCst);
3707                // Echo the first message back.
3708                if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
3709                    let _ = ch.send(&msg);
3710                }
3711                Ok(())
3712            }
3713        }
3714
3715        let env =
3716            ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
3717        let addr = env.bound_addr().expect("dispatcher must be bound");
3718
3719        // Register a ping handler on the running dispatcher.
3720        if let Some(ref disp) = env.tcp_dispatcher {
3721            let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
3722            disp.register("ping", handler.clone());
3723
3724            // Give the accept thread a moment.
3725            std::thread::sleep(Duration::from_millis(20));
3726
3727            let client = connect_to_service(addr, "ping").unwrap();
3728            client.send(b"hello").unwrap();
3729            let reply = client.receive(Duration::from_secs(2)).unwrap();
3730            assert_eq!(reply, Some(b"hello".to_vec()));
3731
3732            assert_eq!(handler.count.load(AO::SeqCst), 1);
3733        }
3734
3735        env.close().unwrap();
3736    }
3737
3738    #[test]
3739    fn test_become_master_auto_transitions_from_detached() {
3740        // The state machine requires Detached -> Unknown -> Master.
3741        // become_master() should handle this automatically.
3742        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3743        assert_eq!(env.get_state(), NodeState::Detached);
3744        env.become_master(1).unwrap();
3745        assert_eq!(env.get_state(), NodeState::Master);
3746    }
3747
3748    #[test]
3749    fn test_become_replica_auto_transitions_from_detached() {
3750        // The state machine requires Detached -> Unknown -> Replica.
3751        // become_replica() should handle this automatically.
3752        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3753        assert_eq!(env.get_state(), NodeState::Detached);
3754        env.become_replica("master_node").unwrap();
3755        assert_eq!(env.get_state(), NodeState::Replica);
3756    }
3757
3758    #[test]
3759    fn test_cannot_transfer_master_when_shutdown() {
3760        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3761        env.become_master(1).unwrap();
3762        env.close().unwrap();
3763
3764        let config = MasterTransferConfig::new(
3765            "target".to_string(),
3766            Duration::from_secs(30),
3767        );
3768        let result = env.transfer_master(config);
3769        assert!(result.is_err());
3770    }
3771
3772    #[test]
3773    fn test_full_lifecycle() {
3774        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3775
3776        // Start as detached
3777        assert_eq!(env.get_state(), NodeState::Detached);
3778
3779        // Become master
3780        env.become_master(1).unwrap();
3781        assert!(env.is_master());
3782
3783        // Register some VLSNs
3784        env.register_vlsn(1, 0, 100);
3785        env.register_vlsn(2, 0, 200);
3786
3787        // Record ack from replica
3788        env.record_ack(1, "replica1");
3789        env.record_ack(2, "replica1");
3790
3791        // Transition to replica (simulating failover)
3792        env.become_replica("node2").unwrap();
3793        assert!(env.is_replica());
3794
3795        // Apply entries from new master
3796        env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
3797
3798        // Close
3799        env.close().unwrap();
3800        assert!(env.is_shutdown());
3801    }
3802
3803    /// Verify that `with_environment` lazily registers the RESTORE service on
3804    /// the TCP dispatcher when `config.env_home` was not set at construction.
3805    ///
3806    /// This mirrors`RepNode.envSetup()` which registers the restore handler
3807    /// when the environment is wired into the replicated node.
3808    #[test]
3809    fn test_restore_registered_lazily_via_with_environment() {
3810        use noxu_dbi::EnvironmentImpl;
3811        use tempfile::TempDir;
3812
3813        let dir = TempDir::new().expect("temp dir");
3814
3815        // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
3816        let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
3817            .node_port(0)
3818            .build();
3819
3820        let rep_env = ReplicatedEnvironment::new(config).unwrap();
3821
3822        // Not yet registered.
3823        assert!(
3824            !rep_env
3825                .restore_registered
3826                .load(std::sync::atomic::Ordering::SeqCst)
3827        );
3828
3829        // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
3830        let env_impl = Arc::new(
3831            EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
3832        );
3833        rep_env.with_environment(env_impl);
3834
3835        // Now the RESTORE service must be registered.
3836        assert!(
3837            rep_env
3838                .restore_registered
3839                .load(std::sync::atomic::Ordering::SeqCst)
3840        );
3841    }
3842
3843    /// Verify that when `config.env_home` IS set at construction, the RESTORE
3844    /// service is registered immediately (not deferred).
3845    #[test]
3846    fn test_restore_registered_eagerly_when_env_home_in_config() {
3847        use tempfile::TempDir;
3848
3849        let dir = TempDir::new().expect("temp dir");
3850
3851        let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
3852            .node_port(0)
3853            .env_home(dir.path())
3854            .build();
3855
3856        let rep_env = ReplicatedEnvironment::new(config).unwrap();
3857
3858        // Should be registered immediately (env_home was in config).
3859        assert!(
3860            rep_env
3861                .restore_registered
3862                .load(std::sync::atomic::Ordering::SeqCst)
3863        );
3864    }
3865}