Skip to main content

noxu_rep/
replicated_environment.rs

1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29    AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30    ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43    ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::{
50    AnyServiceDispatcher, TcpServiceDispatcher,
51};
52use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
53use crate::network_restore_server::{
54    NetworkRestoreServer, RESTORE_SERVICE_NAME,
55};
56use crate::node_state::{NodeState, NodeStateMachine};
57use crate::rep_config::RepConfig;
58use crate::rep_stats::RepStats;
59use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
60use crate::stream::feeder::Feeder;
61use crate::stream::peer_feeder::{
62    PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
63};
64use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
65use crate::vlsn::vlsn_index::VlsnIndex;
66use crate::vlsn::vlsn_range::VlsnRange;
67
68/// Default heartbeat timeout for master liveness detection.
69const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
70
71/// A replicated database environment.
72///
73///
74///
75/// This is the entry point for replication. It wraps a standard Environment
76/// and adds replication capabilities including master election, replica
77/// streaming, and commit acknowledgments.
78///
79/// High Availability (HA) provides a replicated, embedded database
80/// management system which provides fast, reliable, and scalable data
81/// management. HA enables replication of an environment across a Replication
82/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
83///
84/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
85/// operations are executed in the same fashion in both replicated and
86/// non-replicated applications. A `ReplicatedEnvironment` must be
87/// transactional. All replicated databases created in the replicated
88/// environment must be transactional as well.
89///
90/// A `ReplicatedEnvironment` joins its replication group when it is created.
91/// When `new()` returns, the node will have established contact with the other
92/// members of the group and will be ready to service operations.
93///
94/// Replicated environments can be created with node type Electable or
95/// Secondary. Electable nodes can be masters or replicas, and participate in
96/// both master elections and commit durability decisions. Secondary nodes can
97/// only be replicas, not masters, and do not participate in either elections or
98/// durability decisions.
99///
100/// # Example
101///
102/// ```ignore
103/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
104///
105/// let config = RepConfig::builder("my_group", "node1", "localhost")
106///     .node_port(5001)
107///     .build();
108/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
109/// ```
110pub struct ReplicatedEnvironment {
111    /// The replication configuration for this node.
112    config: RepConfig,
113    /// Tracks the current node state (Detached, Unknown, Master, Replica).
114    node_state: NodeStateMachine,
115    /// Service for managing the replication group membership.
116    group_service: GroupService,
117    /// Maps VLSNs to log file positions.
118    ///
119    /// Wrapped in `Arc` so that background daemons (election driver,
120    /// VLSN-index persistence flusher) can share access without
121    /// borrowing the env.  Closes finding F11 (
122    /// `docs/src/internal/api-audit-2026-05-rep.md`).
123    vlsn_index: Arc<VlsnIndex>,
124    /// Tracks acknowledgments from replicas (used by master).
125    ack_tracker: AckTracker,
126    /// Replication statistics.
127    stats: RepStats,
128    /// Active feeder threads (master -> replica streams).
129    feeders: RwLock<Vec<Feeder>>,
130    /// Replica stream for receiving updates from the master.
131    replica_stream: ReplicaStream,
132    /// Tracks the current master node.
133    master_tracker: MasterTracker,
134    /// State change listeners.
135    listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
136    /// Shutdown flag.
137    shutdown: AtomicBool,
138    /// Service dispatcher — listens on the replication port and routes
139    /// incoming connections to the appropriate service handler (feeder, etc.).
140    ///
141    /// `Plain`: plain TCP (default / Phase-2 behaviour).
142    /// `Tls`: TLS + mTLS enforcement (Phase 3, when `RepConfig::tls_config` is set
143    /// and `transport_kind` is `Tls`).
144    ///
145    /// `None` only when the bind address cannot be resolved.
146    tcp_dispatcher: Option<AnyServiceDispatcher>,
147    /// The address the `tcp_dispatcher` is actually bound to (may differ from
148    /// the configured port when port 0 is used in tests).
149    bound_addr: Option<SocketAddr>,
150
151    /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
152    ///
153    /// When set, `become_master` spawns a `FeederRunner` per replica using
154    /// `EnvironmentLogScanner`, and `become_replica` spawns a
155    /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
156    ///
157    /// In HA.
158    env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
159
160    /// Background I/O thread handles spawned during state transitions.
161    ///
162    /// Stored so that `close()` can join them cleanly.  Each handle is
163    /// `Option` so we can `take()` it in `close()`.
164    io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
165
166    /// Shutdown flag shared with I/O threads so they terminate when the
167    /// environment is closed.
168    io_shutdown: AtomicBool,
169
170    /// Whether the RESTORE service has been registered on the TCP dispatcher.
171    ///
172    /// When `config.env_home` is `None` at construction time, registration is
173    /// deferred until `with_environment()` provides the env home path.
174    restore_registered: AtomicBool,
175
176    /// In-memory log queue used by the peer feeder service.
177    ///
178    /// When this node is a replica, `apply_entry()` pushes each received log
179    /// entry here.  The `PeerFeederService` registered on the TCP dispatcher
180    /// reads from this queue to stream entries to downstream replicas that
181    /// are behind this node (peer-to-peer log distribution, HA style).
182    peer_scanner: Arc<PeerLogScanner>,
183
184    /// Monotonic sequence used by `await_replica_acks` to assign unique
185    /// keys to in-flight commits awaiting replica acknowledgment.  In
186    /// production this should track the real master VLSN; until F11
187    /// closes the VLSN<->commit linkage, the coordinator uses a
188    /// synthetic sequence so that ack tracking is unique per commit.
189    /// See finding F1 in `docs/src/internal/api-audit-2026-05-rep.md`.
190    commit_ack_seq: std::sync::atomic::AtomicU64,
191
192    /// Shared acceptor state used by the ELECTION service handler.
193    /// The election driver updates `own_vlsn` / `own_term` here as the
194    /// node progresses; incoming acceptor sessions read it on every
195    /// connection so their replies always reflect the local node's
196    /// most recent state.  Closes finding F6.
197    election_state: Arc<ElectionAcceptorState>,
198
199    /// Self-referential `Weak` populated once the env has been wrapped
200    /// in an `Arc`.  Used by the replica I/O thread spawned in
201    /// `become_replica` so it can call `bootstrap_via_dispatcher` when
202    /// the master signals `NeedsRestore`.
203    ///
204    /// Populated lazily via [`Self::init_self_weak`] from `open()` and
205    /// the test harness.  When unset (callers that build the env via
206    /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
207    /// the I/O thread falls back to operator-driven bootstrap.
208    self_weak: OnceLock<Weak<Self>>,
209}
210
211impl ReplicatedEnvironment {
212    /// Create a new replicated environment.
213    ///
214    ///
215    ///
216    /// Creates a replicated environment handle and starts participating in the
217    /// replication group. The node's state is determined when it joins the
218    /// group, and mastership is not preconfigured. If the group has no current
219    /// master, creation will trigger an election to determine whether this node
220    /// will participate as a Master or a Replica.
221    ///
222    /// A brand new node will always join an existing group as a Replica, unless
223    /// it is the very first electable node that is creating the group. In that
224    /// case it joins as the Master of the newly formed singleton group.
225    pub fn new(config: RepConfig) -> Result<Self> {
226        // mTLS Phase 2 (v3.1.0): peer_allowlist enforcement is real at the
227        // TLS channel layer (TlsTcpChannelListener::bind_with_tls_and_allowlist).
228        // Phase 3 (this release): when RepConfig::tls_config is set AND
229        // transport_kind is Tls, the service dispatcher itself enforces mTLS
230        // via TlsTcpServiceDispatcher.  For the remaining cases (no TlsConfig
231        // or non-TLS transport) keep the Phase-2 accurate warn.
232        if !config.peer_allowlist.is_empty() {
233            match config.transport_kind {
234                crate::rep_config::RepTransportKind::Tls => {
235                    if config.tls_config.is_some() {
236                        log::info!(
237                            "[{}] peer_allowlist ({} entries) + tls_config set; \
238                             mTLS will be enforced on the service dispatcher.",
239                            config.node_name,
240                            config.peer_allowlist.len(),
241                        );
242                    } else {
243                        log::info!(
244                            "[{}] peer_allowlist configured ({} entries) but \
245                             tls_config is None — the service dispatcher will \
246                             use plain TCP. Set RepConfig::tls_config to \
247                             activate end-to-end mTLS on this path.",
248                            config.node_name,
249                            config.peer_allowlist.len(),
250                        );
251                    }
252                }
253                _ => {
254                    log::warn!(
255                        "[{}] peer_allowlist is configured ({} entries) but \
256                         transport_kind is not Tls — the allowlist has no \
257                         effect without TLS transport. Set \
258                         RepTransportKind::Tls to activate mTLS enforcement.",
259                        config.node_name,
260                        config.peer_allowlist.len(),
261                    );
262                }
263            }
264        }
265        let node_state = NodeStateMachine::new();
266        let group_service = GroupService::new(config.group_name.clone());
267        let vlsn_index = {
268            // F11: try to load a previously persisted vlsn.idx from
269            // env_home if one exists.  A successfully loaded index lets a
270            // restarted replica resume from where it left off without a
271            // full network restore; a missing or corrupt file falls back
272            // to a fresh in-memory index (caller will need to bootstrap).
273            if let Some(ref home) = config.env_home {
274                match crate::vlsn::persist::load_from_disk(home) {
275                    Ok(Some(idx)) => {
276                        log::info!(
277                            "Node '{}' loaded persisted VLSN index from {} \
278                             ({} entries, latest vlsn={})",
279                            config.node_name,
280                            home.display(),
281                            idx.snapshot_entries().len(),
282                            idx.get_latest_vlsn(),
283                        );
284                        Arc::new(idx)
285                    }
286                    Ok(None) => Arc::new(VlsnIndex::new(10)),
287                    Err(e) => {
288                        log::warn!(
289                            "Node '{}' failed to load persisted VLSN index \
290                             from {}: {} (treating as fresh node — network \
291                             restore required)",
292                            config.node_name,
293                            home.display(),
294                            e
295                        );
296                        // Best-effort: remove the corrupt file so the
297                        // next persist cycle writes a clean one.  A
298                        // missing file is the "fresh node" baseline.
299                        let _ = std::fs::remove_file(
300                            crate::vlsn::persist::index_path(home),
301                        );
302                        Arc::new(VlsnIndex::new(10))
303                    }
304                }
305            } else {
306                Arc::new(VlsnIndex::new(10))
307            }
308        };
309        let ack_tracker = AckTracker::new();
310        let stats = RepStats::new();
311        let feeders = RwLock::new(Vec::new());
312        let replica_stream = ReplicaStream::new();
313        let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
314
315        // Start the service dispatcher.
316        //
317        // Phase 3: when RepConfig::tls_config is set AND transport_kind is Tls,
318        // start a TlsTcpServiceDispatcher (mTLS enforced).  Otherwise fall back
319        // to the plain-TCP TcpServiceDispatcher.
320        let listen_addr_str =
321            format!("{}:{}", config.node_host, config.node_port);
322        let mut restore_registered_init = false;
323
324        // Returns (AnyServiceDispatcher, bound_addr) or (None, None) on error.
325        let (tcp_dispatcher, bound_addr) = match listen_addr_str
326            .parse::<SocketAddr>()
327        {
328            Ok(addr) => {
329                let build_result: Result<(AnyServiceDispatcher, SocketAddr)> =
330                    Self::build_dispatcher(&config, addr);
331                match build_result {
332                    Ok((dispatcher, bound)) => {
333                        // Register the network restore handler.
334                        if let Some(ref home) = config.env_home {
335                            let restore_server =
336                                NetworkRestoreServer::new(home.clone());
337                            dispatcher.register(
338                                RESTORE_SERVICE_NAME,
339                                Arc::new(restore_server),
340                            );
341                            log::debug!(
342                                "Node '{}' RESTORE service registered \
343                                     (env_home={})",
344                                config.node_name,
345                                home.display(),
346                            );
347                            restore_registered_init = true;
348                        }
349                        let kind =
350                            if dispatcher.is_tls() { "TLS" } else { "TCP" };
351                        log::info!(
352                            "Node '{}' {} service dispatcher started on {}",
353                            config.node_name,
354                            kind,
355                            bound
356                        );
357                        (Some(dispatcher), Some(bound))
358                    }
359                    Err(e) => {
360                        log::warn!(
361                            "Node '{}' failed to start service dispatcher \
362                             on {}: {}",
363                            config.node_name,
364                            listen_addr_str,
365                            e
366                        );
367                        (None, None)
368                    }
369                }
370            }
371            Err(e) => {
372                log::warn!(
373                    "Node '{}' cannot parse listen address '{}': {}",
374                    config.node_name,
375                    listen_addr_str,
376                    e
377                );
378                (None, None)
379            }
380        };
381
382        // Build the in-memory peer log scanner; register the peer feeder
383        // service on the dispatcher so downstream replicas can connect.
384        let peer_scanner = Arc::new(PeerLogScanner::new());
385        // F5/F31: build the acceptor state with persistence enabled when
386        // env_home is configured.  Crash-durable promises are required
387        // for the Paxos safety invariant after a process restart.
388        let election_state =
389            Arc::new(if let Some(ref home) = config.env_home {
390                ElectionAcceptorState::with_env_home(
391                    config.node_name.clone(),
392                    1,
393                    home,
394                )
395            } else {
396                ElectionAcceptorState::new(config.node_name.clone(), 1)
397            });
398        if let Some(ref dispatcher) = tcp_dispatcher {
399            let service = PeerFeederService::new(Arc::clone(&peer_scanner));
400            dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
401            log::debug!(
402                "Node '{}' PEER_FEEDER service registered",
403                config.node_name,
404            );
405            // F6: register the ELECTION service so peers can run
406            // run_acceptor against this node when proposing.
407            let election_svc =
408                Arc::new(ElectionService::new(Arc::clone(&election_state)));
409            dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
410            log::debug!(
411                "Node '{}' ELECTION service registered",
412                config.node_name,
413            );
414        }
415
416        let env = Self {
417            config,
418            node_state,
419            group_service,
420            vlsn_index,
421            ack_tracker,
422            stats,
423            feeders,
424            replica_stream,
425            master_tracker,
426            listeners: RwLock::new(Vec::new()),
427            shutdown: AtomicBool::new(false),
428            tcp_dispatcher,
429            bound_addr,
430            env_impl: StdMutex::new(None),
431            io_threads: StdMutex::new(Vec::new()),
432            io_shutdown: AtomicBool::new(false),
433            restore_registered: AtomicBool::new(restore_registered_init),
434            peer_scanner,
435            commit_ack_seq: std::sync::atomic::AtomicU64::new(1),
436            election_state,
437            self_weak: OnceLock::new(),
438        };
439
440        Ok(env)
441    }
442
443    /// Open a replicated environment with the standard production
444    /// lifecycle.
445    ///
446    /// This is the entry point recommended by the mdBook chapters:
447    /// it allocates the `ReplicatedEnvironment`, registers all
448    /// services on the TCP dispatcher, and spawns the **election
449    /// driver** background thread that runs Paxos rounds against
450    /// known peers until the node has resolved into either Master or
451    /// Replica state.
452    ///
453    /// Closes finding F6 of `docs/src/internal/api-audit-2026-05-rep.md`.
454    ///
455    /// Use [`ReplicatedEnvironment::new`] directly only when the
456    /// caller plans to drive state transitions explicitly (test
457    /// harnesses, scripted bootstrap, recovery tooling).
458    pub fn open(config: RepConfig) -> Result<Arc<Self>> {
459        let env = Arc::new(Self::new(config)?);
460        env.init_self_weak();
461        env.start_election_driver();
462        env.start_vlsn_persistence_daemon();
463        env.register_admin_service();
464        Ok(env)
465    }
466
467    /// Build the service dispatcher for this node.
468    ///
469    /// Phase 3 logic: when `config.transport_kind == Tls` AND
470    /// `config.tls_config` is `Some`, start a
471    /// [`crate::net::service_dispatcher::TlsTcpServiceDispatcher`] that
472    /// enforces mTLS with the configured `peer_allowlist`.  Otherwise
473    /// start the plain-TCP [`TcpServiceDispatcher`].
474    ///
475    /// Returns `(dispatcher, bound_addr)` or a `RepError` on bind / TLS
476    /// config failure.
477    fn build_dispatcher(
478        #[cfg_attr(not(feature = "tls-rustls"), allow(unused_variables))]
479        config: &RepConfig,
480        addr: SocketAddr,
481    ) -> Result<(AnyServiceDispatcher, SocketAddr)> {
482        #[cfg(feature = "tls-rustls")]
483        if config.transport_kind == crate::rep_config::RepTransportKind::Tls {
484            use crate::auth::PeerAllowlist;
485            use crate::net::service_dispatcher::TlsTcpServiceDispatcher;
486            let tls = config.tls_config.as_ref().ok_or_else(|| {
487                RepError::ConfigError(format!(
488                    "node '{}': transport_kind=Tls requires a tls_config",
489                    config.node_name,
490                ))
491            })?;
492            let allowlist =
493                PeerAllowlist::new(config.peer_allowlist.iter().cloned());
494            // Fail-closed: an empty allowlist with TLS transport is a
495            // misconfiguration. The same policy is enforced at the TLS
496            // listener and QUIC constructors; downgrading to plain TCP here
497            // would be a silent security regression for a node that asked
498            // for TLS.
499            if allowlist.is_empty() {
500                return Err(RepError::ConfigError(format!(
501                    "node '{}': transport_kind=Tls requires a non-empty \
502                     peer_allowlist (mTLS enforcement is fail-closed)",
503                    config.node_name,
504                )));
505            }
506            let disp = TlsTcpServiceDispatcher::new(addr, tls, allowlist)?;
507            let bound = disp.start()?;
508            return Ok((AnyServiceDispatcher::Tls(disp), bound));
509        }
510        // Plain-TCP dispatcher (default or when TLS config is missing).
511        let disp = TcpServiceDispatcher::new(addr).map_err(|e| {
512            RepError::NetworkError(format!("TCP dispatcher init: {e}"))
513        })?;
514        let bound = disp.start()?;
515        Ok((AnyServiceDispatcher::Plain(disp), bound))
516    }
517
518    /// Populate the env's self-referential `Weak` so background
519    /// threads can obtain a back-reference for auto-orchestrated
520    /// follow-up actions (e.g. replica auto-bootstrap on
521    /// `NeedsRestore`).  Idempotent: subsequent calls are silent
522    /// no-ops because the inner [`OnceLock`] only accepts one set.
523    ///
524    /// Callers that wrap the env in `Arc` and want auto-bootstrap
525    /// behaviour should call this immediately after construction.
526    /// `Self::open` already does so.  Test harnesses that drive
527    /// transitions manually (`RepTestBase`) also call this so the
528    /// auto-bootstrap path is exercised in tests.
529    pub fn init_self_weak(self: &Arc<Self>) {
530        let _ = self.self_weak.set(Arc::downgrade(self));
531    }
532
533    /// Register the `ADMIN` service handler on the TCP dispatcher.
534    ///
535    /// Closes findings F7 / F8.  Holds a `Weak<Self>` so the handler
536    /// does not extend the env's lifetime.  Idempotent: re-registering
537    /// is harmless because `TcpServiceDispatcher::register` overwrites
538    /// the existing handler.
539    pub fn register_admin_service(self: &Arc<Self>) {
540        if let Some(ref dispatcher) = self.tcp_dispatcher {
541            crate::group_admin::register_admin_service(
542                dispatcher,
543                Arc::downgrade(self),
544            );
545            log::debug!(
546                "Node '{}' ADMIN service registered",
547                self.config.node_name,
548            );
549        }
550    }
551
552    /// Spawn the VLSN-index persistence daemon (F11).
553    ///
554    /// Periodically (every 2 seconds) snapshots the in-memory
555    /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
556    /// resume from where the replica left off without a full network
557    /// restore.  No-op when `config.env_home` is `None`.
558    ///
559    /// Idempotent: only one daemon is ever spawned per env.
560    pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
561        let Some(home) = self.config.env_home.clone() else {
562            return;
563        };
564        {
565            let threads = self.io_threads.lock().unwrap();
566            if threads.iter().any(|h| {
567                h.thread()
568                    .name()
569                    .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
570            }) {
571                return;
572            }
573        }
574
575        let vlsn_index = Arc::clone(&self.vlsn_index);
576        let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
577        let me = Arc::clone(self);
578        let interval = Duration::from_secs(2);
579
580        let handle = std::thread::Builder::new()
581            .name(name)
582            .spawn(move || {
583                use std::sync::atomic::Ordering;
584                let mut last_persisted_vlsn: u64 = 0;
585                while !me.io_shutdown.load(Ordering::SeqCst)
586                    && !me.is_shutdown()
587                {
588                    std::thread::sleep(interval);
589                    if me.io_shutdown.load(Ordering::SeqCst) {
590                        break;
591                    }
592                    let latest = vlsn_index.get_latest_vlsn();
593                    if latest == last_persisted_vlsn {
594                        // Nothing new to flush.
595                        continue;
596                    }
597                    // X-2: cap the flush at the last durable checkpoint's
598                    // end LSN so the persisted VLSN index never claims
599                    // VLSNs beyond the durable B-tree state.  After a crash
600                    // the recovered tree and the index will be coherent.
601                    let cap_lsn = me
602                        .env_impl
603                        .lock()
604                        .unwrap()
605                        .as_ref()
606                        .and_then(|e| e.get_checkpointer())
607                        .map(|c| c.get_last_checkpoint_end())
608                        .unwrap_or(noxu_util::NULL_LSN);
609                    match crate::vlsn::persist::flush_to_disk_capped(
610                        &vlsn_index,
611                        &home,
612                        cap_lsn,
613                    ) {
614                        Ok(n) => {
615                            log::trace!(
616                                "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
617                                n,
618                                latest,
619                                cap_lsn,
620                            );
621                            last_persisted_vlsn = latest;
622                        }
623                        Err(e) => {
624                            log::warn!(
625                                "vlsn-flush: failed to persist VLSN index to {}: {}",
626                                home.display(),
627                                e
628                            );
629                        }
630                    }
631                }
632                // Final flush on shutdown so a clean close is recoverable.
633                // Cap at the last checkpoint even for the shutdown flush.
634                let cap_lsn = me
635                    .env_impl
636                    .lock()
637                    .unwrap()
638                    .as_ref()
639                    .and_then(|e| e.get_checkpointer())
640                    .map(|c| c.get_last_checkpoint_end())
641                    .unwrap_or(noxu_util::NULL_LSN);
642                if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
643                    &vlsn_index,
644                    &home,
645                    cap_lsn,
646                ) {
647                    log::warn!(
648                        "vlsn-flush (final): failed to persist VLSN index: {}",
649                        e
650                    );
651                }
652            })
653            .expect("failed to spawn noxu-vlsn-flush thread");
654
655        self.io_threads.lock().unwrap().push(handle);
656        log::debug!(
657            "Node '{}' VLSN persistence daemon started",
658            self.config.node_name,
659        );
660    }
661
662    /// Spawn the election driver background thread.
663    ///
664    /// While the env is in `Detached` or `Unknown` state and no master
665    /// is known, the driver periodically attempts a Paxos election
666    /// against peers in `GroupService` (whose ELECTION services were
667    /// registered at `open()` time).  On success the driver calls
668    /// `become_master` (if this node is the winner) or `become_replica`
669    /// (otherwise).  On failure (no quorum), the driver waits
670    /// `config.election_timeout` and tries again.
671    ///
672    /// The driver respects `io_shutdown`; on env close the loop exits
673    /// promptly.
674    ///
675    /// Idempotent: a second call is a no-op (only one driver thread is
676    /// ever spawned per env).
677    pub fn start_election_driver(self: &Arc<Self>) {
678        use std::sync::atomic::Ordering;
679        // Reuse io_shutdown for cancellation; a successful spawn is
680        // recorded by appending to io_threads, so a duplicate call
681        // would re-add a thread — we use a one-shot `AtomicBool`
682        // sentinel placed in the io_shutdown's slot via a new field.
683        // Cheaper: a static name check on io_threads is impossible;
684        // instead, gate spawning on whether any io_thread already
685        // carries the driver name.
686        {
687            let threads = self.io_threads.lock().unwrap();
688            if threads.iter().any(|h| {
689                h.thread()
690                    .name()
691                    .is_some_and(|n| n.starts_with("noxu-election-"))
692            }) {
693                return;
694            }
695        }
696
697        let me = Arc::clone(self);
698        let name = format!("noxu-election-{}", self.config.node_name);
699        let handle = std::thread::Builder::new()
700            .name(name)
701            .spawn(move || {
702                me.run_election_loop();
703            })
704            .expect("failed to spawn election driver thread");
705        self.io_threads.lock().unwrap().push(handle);
706        log::debug!("Node '{}' election driver started", self.config.node_name,);
707        // Keep ordering sane on the io_shutdown flag.
708        let _ = self.io_shutdown.load(Ordering::SeqCst);
709    }
710
711    /// Body of the election driver loop.  Public only for tests; called
712    /// by [`Self::start_election_driver`].
713    fn run_election_loop(self: Arc<Self>) {
714        use std::sync::atomic::Ordering;
715        // Maintain an internal monotonically increasing election term.
716        // Each successful or failed round bumps the term so retries do
717        // not collide with stale acceptor promises.
718        let mut term: u64 = 1;
719
720        loop {
721            if self.io_shutdown.load(Ordering::SeqCst) {
722                return;
723            }
724            if self.is_shutdown() {
725                return;
726            }
727
728            let state = self.node_state.get_state();
729            // Stop driving once we've resolved into Master/Replica;
730            // re-arm only if the node returns to Unknown.
731            if matches!(state, NodeState::Master | NodeState::Replica) {
732                std::thread::sleep(Duration::from_millis(200));
733                continue;
734            }
735            if matches!(state, NodeState::Shutdown) {
736                return;
737            }
738
739            // Probe peers for an active master via the existing
740            // GroupService cache.  In the absence of a heartbeat path
741            // we rely on master_tracker (set by become_replica from
742            // the receive loop).
743            if let Some(master_name) = self.master_tracker.get_master()
744                && master_name != self.config.node_name
745            {
746                let _ = self.become_replica(&master_name);
747                continue;
748            }
749
750            // Snapshot peers to dial for ELECTION.
751            let peers: Vec<(String, SocketAddr)> = self
752                .group_service
753                .get_all_nodes()
754                .into_iter()
755                .filter(|n| n.name != self.config.node_name)
756                .filter_map(|n| {
757                    format!("{}:{}", n.host, n.port)
758                        .parse::<SocketAddr>()
759                        .ok()
760                        .map(|a| (n.name, a))
761                })
762                .collect();
763
764            // Build the local rep group view used by run_election to
765            // compute quorum and resolve the winner name.  Include
766            // self.
767            let group = self.local_rep_group_with_self();
768
769            // Update election state for any concurrent acceptor calls.
770            let our_vlsn = self.vlsn_index.get_latest_vlsn();
771            self.election_state.set_vlsn(our_vlsn);
772            self.election_state.set_term(term);
773
774            // Connect to each peer's ELECTION service.  Failures are
775            // tolerated: a peer that doesn't answer simply contributes
776            // no vote.  The election may still reach quorum in the
777            // remaining peers.
778            let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
779                Vec::new();
780            for (peer_name, addr) in &peers {
781                match crate::net::service_dispatcher::connect_to_service(
782                    *addr,
783                    ELECTION_SERVICE_NAME,
784                ) {
785                    Ok(ch) => {
786                        let arc: Arc<dyn crate::net::channel::Channel> =
787                            Arc::new(ch);
788                        channels.push(arc);
789                    }
790                    Err(e) => {
791                        log::trace!(
792                            "election driver: peer {} ({}) unreachable: {}",
793                            peer_name,
794                            addr,
795                            e
796                        );
797                    }
798                }
799            }
800
801            // Resolve our own node_id from the group; if not present
802            // we cannot run an election (closed-world guard — see F22).
803            let self_node_id =
804                group.get_node(&self.config.node_name).map(|n| n.node_id());
805            let self_node_id = match self_node_id {
806                Some(id) => id,
807                None => {
808                    log::warn!(
809                        "election driver: node '{}' not registered in \
810                         own group view; sleeping",
811                        self.config.node_name
812                    );
813                    std::thread::sleep(Duration::from_millis(200));
814                    continue;
815                }
816            };
817
818            log::debug!(
819                "election driver on '{}': starting term={} with {} peers",
820                self.config.node_name,
821                term,
822                channels.len(),
823            );
824            let outcome = crate::elections::paxos::run_election(
825                self_node_id,
826                &self.config.node_name,
827                &group,
828                &channels,
829                our_vlsn,
830                /* priority */ 1,
831                term,
832            );
833
834            match outcome {
835                Some(winner_id) if winner_id == self_node_id => {
836                    if let Err(e) = self.become_master(term) {
837                        log::warn!(
838                            "election driver: become_master failed: {}",
839                            e
840                        );
841                    } else {
842                        log::info!(
843                            "election driver: '{}' became master at term {}",
844                            self.config.node_name,
845                            term,
846                        );
847                    }
848                }
849                Some(winner_id) => {
850                    if let Some(winner_node) = group
851                        .get_nodes()
852                        .into_iter()
853                        .find(|n| n.node_id() == winner_id)
854                    {
855                        if let Err(e) = self.become_replica(&winner_node.name) {
856                            log::warn!(
857                                "election driver: become_replica failed: {}",
858                                e
859                            );
860                        } else {
861                            log::info!(
862                                "election driver: '{}' became replica of '{}' at term {}",
863                                self.config.node_name,
864                                winner_node.name,
865                                term,
866                            );
867                        }
868                    }
869                }
870                None => {
871                    log::debug!(
872                        "election driver on '{}' term={}: no quorum",
873                        self.config.node_name,
874                        term,
875                    );
876                }
877            }
878
879            term = term.saturating_add(1);
880            // Back off so we don't pin the loop on transient failures.
881            std::thread::sleep(
882                self.config.election_timeout.min(Duration::from_millis(500)),
883            );
884        }
885    }
886
887    /// Internal: a `RepGroup` snapshot that includes self.
888    fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
889        let mut group = self.get_rep_group();
890        // Ensure self is present in the group view; the
891        // group_service does not auto-register the local node.
892        if group.get_node(&self.config.node_name).is_none() {
893            let mut self_node = crate::rep_node::RepNode::new(
894                self.config.node_name.clone(),
895                self.config.node_type,
896                self.config.node_host.clone(),
897                self.config.node_port,
898                /* node_id */ 0,
899            );
900            // Stable self node_id derived from the name hash so
901            // re-creations in the same process don't collide.
902            use std::hash::{Hash, Hasher};
903            let mut hasher = std::collections::hash_map::DefaultHasher::new();
904            self.config.node_name.hash(&mut hasher);
905            // Restrict to a u32 range and avoid 0 (reserved for
906            // "unknown").
907            let id = ((hasher.finish() as u32) | 1).max(1);
908            self_node.node_id = id;
909            group.add_node(self_node);
910        }
911        group
912    }
913
914    /// Return the socket address the TCP service dispatcher is bound to.
915    ///
916    /// This may differ from the configured `node_port` when port 0 is used
917    /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
918    /// could not be started (e.g. the address is not resolvable).
919    pub fn bound_addr(&self) -> Option<SocketAddr> {
920        self.bound_addr
921    }
922
923    /// Wire a live `EnvironmentImpl` into this replicated environment.
924    ///
925    /// After this call, state transitions (`become_master`, `become_replica`)
926    /// will spawn real feeder/receiver I/O threads backed by the live log.
927    ///
928    /// If the RESTORE service was not registered at construction time (because
929    /// `config.env_home` was `None`), it is registered here using the
930    /// environment's actual home path.  This mirrors`RepNode.envSetup()`
931    /// which registers the restore handler during environment wiring.
932    ///
933    /// Environment reference wiring.
934    /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
935    pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
936        // Register RESTORE service lazily if not already done.
937        if !self.restore_registered.load(Ordering::SeqCst)
938            && let Some(ref dispatcher) = self.tcp_dispatcher
939        {
940            let env_home = env.get_env_home().to_path_buf();
941            let restore_server = NetworkRestoreServer::new(env_home.clone());
942            dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
943            self.restore_registered.store(true, Ordering::SeqCst);
944            log::debug!(
945                "Node '{}' RESTORE service registered via with_environment \
946                 (env_home={})",
947                self.config.node_name,
948                env_home.display(),
949            );
950        }
951
952        // X-14: rebuild the VLSN index from recovery-replayed LN entries.
953        // After a crash the on-disk vlsn.idx may be stale (either ahead of
954        // the recovered B-tree, or behind if vlsn.idx was not flushed
955        // after the last checkpoint).  Re-registering all (vlsn, lsn) pairs
956        // from the redo pass gives a consistent in-memory index.
957        if !env.recovery_vlsns.is_empty() {
958            log::info!(
959                "Node '{}': rebuilding VLSN index from {} recovered entries",
960                self.config.node_name,
961                env.recovery_vlsns.len(),
962            );
963            for &(vlsn, lsn_u64) in &env.recovery_vlsns {
964                let lsn = noxu_util::Lsn::from_u64(lsn_u64);
965                self.vlsn_index.register(
966                    vlsn,
967                    lsn.file_number(),
968                    lsn.file_offset(),
969                );
970            }
971        }
972
973        // X-1: truncate the VLSN index to the rollback matchpoint if recovery
974        // detected a completed rollback period.  The matchpoint is the highest
975        // LSN that is still valid after the rollback; entries with higher VLSNs
976        // correspond to data that was rolled back and must not appear in the
977        // index.
978        if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
979            // Find the latest VLSN whose LSN is at or before the matchpoint.
980            // Scan the recovered VLSN pairs (sorted ascending) to find the
981            // boundary.
982            let safe_vlsn = env
983                .recovery_vlsns
984                .iter()
985                .rev()
986                .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
987                .map(|&(vlsn, _)| vlsn)
988                .unwrap_or(0);
989            log::info!(
990                "Node '{}': truncating VLSN index after vlsn={} \
991                 (rollback matchpoint lsn={:#x})",
992                self.config.node_name,
993                safe_vlsn,
994                matchpoint_lsn_u64,
995            );
996            self.vlsn_index.truncate_after(safe_vlsn);
997        }
998
999        *self.env_impl.lock().unwrap() = Some(env);
1000    }
1001
1002    /// Get the current node state.
1003    ///
1004    ///
1005    ///
1006    /// Returns the current state of the node associated with this replication
1007    /// environment. If the caller's intent is to track the state of the node,
1008    /// `StateChangeListener` may be a more convenient and efficient approach.
1009    pub fn get_state(&self) -> NodeState {
1010        self.node_state.get_state()
1011    }
1012
1013    /// Check if this node is the master.
1014    ///
1015    /// Returns true if the node's current state is Master.
1016    pub fn is_master(&self) -> bool {
1017        self.node_state.get_state() == NodeState::Master
1018    }
1019
1020    /// Check if this node is a replica.
1021    ///
1022    /// Returns true if the node's current state is Replica.
1023    pub fn is_replica(&self) -> bool {
1024        self.node_state.get_state() == NodeState::Replica
1025    }
1026
1027    /// Returns true if the node is currently participating in the group
1028    /// as a Replica or a Master.
1029    pub fn is_active(&self) -> bool {
1030        self.node_state.get_state().is_active()
1031    }
1032
1033    /// Get the node name.
1034    ///
1035    ///
1036    ///
1037    /// Returns the unique name used to identify this replicated environment.
1038    pub fn get_node_name(&self) -> &str {
1039        self.config.node_name.as_str()
1040    }
1041
1042    /// Get the group name.
1043    ///
1044    /// Returns the name of the replication group this node belongs to.
1045    pub fn get_group_name(&self) -> &str {
1046        self.config.group_name.as_str()
1047    }
1048
1049    /// Get the current master (if known).
1050    ///
1051    /// Returns the name of the node that is currently the master, or None
1052    /// if the master is not known (e.g. the node is in the Unknown or
1053    /// Detached state).
1054    pub fn get_master_name(&self) -> Option<String> {
1055        self.master_tracker.get_master()
1056    }
1057
1058    /// Get the replication group info.
1059    ///
1060    ///
1061    ///
1062    /// Returns a description of the replication group as known by this node.
1063    /// The replicated group metadata is stored in a replicated database and
1064    /// updates are propagated by the current master node to all replicas. If
1065    /// this node is not the master, it is possible for its description of the
1066    /// group to be out of date.
1067    pub fn get_group(&self) -> &GroupService {
1068        &self.group_service
1069    }
1070
1071    /// Add a peer node to the replication group at runtime.
1072    ///
1073    /// The node is registered in the `GroupService` so elections and quorum
1074    /// calculations immediately reflect the new membership.
1075    pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
1076        use crate::group_service::NodeInfo;
1077        use std::time::Instant;
1078
1079        let info = NodeInfo {
1080            name: node.name.clone(),
1081            node_type: node.node_type,
1082            host: node.host.clone(),
1083            port: node.port,
1084            node_id: node.node_id,
1085            joined_at: Instant::now(),
1086            last_seen: Instant::now(),
1087            is_active: true,
1088            known_vlsn: 0,
1089            log_range: None,
1090            read_capacity_pct: node.read_capacity_pct,
1091            write_capacity_pct: node.write_capacity_pct,
1092            latency_hint_ms: node.latency_hint_ms,
1093        };
1094        self.group_service.add_node(info)?;
1095        log::info!(
1096            "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1097            self.config.node_name,
1098            node.name,
1099            node.host,
1100            node.port,
1101            self.config.group_name,
1102        );
1103
1104        // F9: if we are the current master, immediately register a
1105        // `Feeder` tracker for the new peer so AckTracker bookkeeping
1106        // and downstream pull-based streaming work without a forced
1107        // re-election.
1108        if self.is_master()
1109            && (node.node_type == crate::node_type::NodeType::Electable
1110                || node.node_type == crate::node_type::NodeType::Secondary)
1111        {
1112            let mut feeders = self.feeders.write();
1113            if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1114                feeders.push(Feeder::new(node.name.clone()));
1115                log::debug!(
1116                    "Node '{}' (master): dispatched Feeder for new peer '{}'",
1117                    self.config.node_name,
1118                    node.name,
1119                );
1120            }
1121        }
1122        Ok(())
1123    }
1124
1125    /// Remove a peer node from the replication group by name.
1126    ///
1127    /// The node is deregistered from the `GroupService`.  Elections initiated
1128    /// after this call will not include the removed node in quorum calculations.
1129    pub fn remove_peer(&self, name: &str) -> Result<()> {
1130        self.group_service.remove_node(name)?;
1131        log::info!(
1132            "Node '{}': removed peer '{}' from group '{}'",
1133            self.config.node_name,
1134            name,
1135            self.config.group_name,
1136        );
1137        Ok(())
1138    }
1139
1140    /// Update the capacity and latency metadata of an existing peer.
1141    ///
1142    /// Only the following fields are updated from `node`:
1143    ///   - `read_capacity_pct`
1144    ///   - `write_capacity_pct`
1145    ///   - `latency_hint_ms`
1146    ///
1147    /// The node's identity (name, address, port, node_type) is preserved.
1148    /// Safe to call while replication is active.
1149    ///
1150    /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1151    /// is rebuilt to reflect the new capacity/latency weights.
1152    ///
1153    /// # Note
1154    ///
1155    /// `update_peer_metadata` does not currently re-run
1156    /// `QuorumPolicy::validate(electable_count)` after the metadata
1157    /// change.  An LP-optimal `Expression` quorum that was safe before
1158    /// the update may no longer satisfy the intersection property
1159    /// afterwards.  Until automatic revalidation lands, deployments
1160    /// using `QuorumPolicy::Expression` should call
1161    /// `quorum_policy().validate(get_rep_group().electable_count())`
1162    /// on the returned `RepGroup` after every metadata change and
1163    /// fail the operator-facing operation if validation reports
1164    /// unsafety.
1165    pub fn update_peer_metadata(
1166        &self,
1167        name: &str,
1168        node: crate::rep_node::RepNode,
1169    ) -> Result<()> {
1170        self.group_service.update_node_metadata(
1171            name,
1172            node.read_capacity_pct,
1173            node.write_capacity_pct,
1174            node.latency_hint_ms,
1175        )?;
1176        log::info!(
1177            "Node '{}': updated metadata for peer '{}' \
1178             (read_cap={}, write_cap={}, latency={}ms)",
1179            self.config.node_name,
1180            name,
1181            node.read_capacity_pct,
1182            node.write_capacity_pct,
1183            node.latency_hint_ms,
1184        );
1185        Ok(())
1186    }
1187
1188    /// Returns a snapshot of the current replication group as a `RepGroup`.
1189    ///
1190    /// The snapshot reflects the state at the time of the call; subsequent
1191    /// `add_peer` / `remove_peer` calls are not reflected in it.
1192    pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1193        use crate::rep_group::RepGroup;
1194
1195        let mut group = RepGroup::new(
1196            self.config.group_name.clone(),
1197            self.group_service.get_group_id(),
1198        );
1199        for info in self.group_service.get_all_nodes() {
1200            let mut node = crate::rep_node::RepNode::new(
1201                info.name.clone(),
1202                info.node_type,
1203                info.host.clone(),
1204                info.port,
1205                info.node_id,
1206            );
1207            node.read_capacity_pct = info.read_capacity_pct;
1208            node.write_capacity_pct = info.write_capacity_pct;
1209            node.latency_hint_ms = info.latency_hint_ms;
1210            group.add_node(node);
1211        }
1212        group
1213    }
1214
1215    /// Get the replication configuration.
1216    ///
1217    ///
1218    ///
1219    /// Returns the replication configuration that has been used to create this
1220    /// environment.
1221    pub fn get_config(&self) -> &RepConfig {
1222        &self.config
1223    }
1224
1225    /// Get the current VLSN range on this node.
1226    ///
1227    /// Returns the range of VLSNs currently available on this node.
1228    pub fn get_vlsn_range(&self) -> VlsnRange {
1229        self.vlsn_index.get_range()
1230    }
1231
1232    /// Get the latest VLSN.
1233    ///
1234    /// Returns the most recent VLSN registered on this node.
1235    pub fn get_current_vlsn(&self) -> u64 {
1236        self.vlsn_index.get_latest_vlsn()
1237    }
1238
1239    /// Return the list of replica names that currently have a `Feeder`
1240    /// tracker on this (master) node.
1241    ///
1242    /// Used by tests and operator tooling.  The returned list reflects
1243    /// the master's view at the time of the call; subsequent
1244    /// `add_peer`/`remove_peer` calls may change it.
1245    pub fn feeder_replica_names(&self) -> Vec<String> {
1246        self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1247    }
1248
1249    /// Bootstrap this node's environment by network-restoring all `.ndb`
1250    /// files from `peer_name` via the dispatcher's RESTORE service.
1251    ///
1252    /// Closes findings F2 / F4 of `docs/src/internal/api-audit-2026-05-rep.md`.
1253    ///
1254    /// The standalone `NetworkRestore::execute()` opens raw TCP and
1255    /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1256    /// Production replicated environments host the RESTORE handler on the
1257    /// dispatcher, so this method routes through `execute_via_dispatcher`.
1258    ///
1259    /// `peer_name` must be a known peer in `GroupService`; on success the
1260    /// peer's `.ndb` files are written into `config.env_home`.  Returns
1261    /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1262    /// fails for any reason.
1263    pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1264        let env_home = self.config.env_home.clone().ok_or_else(|| {
1265            RepError::ConfigError(
1266                "bootstrap_via_dispatcher requires env_home in RepConfig"
1267                    .into(),
1268            )
1269        })?;
1270        let peer_info = self
1271            .group_service
1272            .get_all_nodes()
1273            .into_iter()
1274            .find(|n| n.name == peer_name)
1275            .ok_or_else(|| {
1276                RepError::ConfigError(format!(
1277                    "peer '{}' not registered in group '{}'",
1278                    peer_name, self.config.group_name,
1279                ))
1280            })?;
1281
1282        let cfg = NetworkRestoreConfig {
1283            source_node: peer_info.name.clone(),
1284            source_host: peer_info.host.clone(),
1285            source_port: peer_info.port,
1286            retain_log_files: true,
1287        };
1288        let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1289        restore.execute_via_dispatcher()?;
1290        log::info!(
1291            "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1292            self.config.node_name,
1293            peer_info.name,
1294            peer_info.host,
1295            peer_info.port,
1296        );
1297        Ok(())
1298    }
1299
1300    /// Get replication statistics.
1301    ///
1302    ///
1303    ///
1304    /// Returns statistics associated with this environment.
1305    pub fn get_stats(&self) -> &RepStats {
1306        &self.stats
1307    }
1308
1309    /// Get the ack tracker.
1310    pub fn get_ack_tracker(&self) -> &AckTracker {
1311        &self.ack_tracker
1312    }
1313
1314    /// Ensure the node state machine is in Unknown state, transitioning
1315    /// from Detached if necessary. This is needed because the state machine
1316    /// only allows Detached -> Unknown -> Master/Replica.
1317    pub fn ensure_unknown_state(&self) -> Result<()> {
1318        let current = self.node_state.get_state();
1319        match current {
1320            NodeState::Unknown => Ok(()),
1321            NodeState::Detached => {
1322                self.node_state.transition_to(NodeState::Unknown)?;
1323                Ok(())
1324            }
1325            // Master and Replica must transition through Unknown before
1326            // joining a new group or reconnecting.
1327            NodeState::Master | NodeState::Replica => {
1328                self.node_state.transition_to(NodeState::Unknown)?;
1329                Ok(())
1330            }
1331            NodeState::Shutdown => {
1332                Err(RepError::StateError("Node is shut down".to_string()))
1333            }
1334        }
1335    }
1336
1337    /// Transition to master state.
1338    ///
1339    /// Transitions this node to Master state for the given election term.
1340    /// As master, the node can accept write operations and feed log entries
1341    /// to replicas.
1342    ///
1343    /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1344    /// a `FeederRunner` + `EnvironmentLogScanner` background thread is spawned
1345    /// for each currently-registered replica (feeder entries in `feeders`).
1346    ///
1347    /// In HA.
1348    pub fn become_master(&self, term: u64) -> Result<()> {
1349        if self.is_shutdown() {
1350            return Err(RepError::StateError(
1351                "Cannot become master: environment is closed".to_string(),
1352            ));
1353        }
1354
1355        // JE invariant: only `Electable` nodes can become master.  `Secondary`,
1356        // `Monitor`, and `Arbiter` are not electable and must be rejected at
1357        // the API layer (mirrors JE `ExceptionTest`).  See
1358        // `NodeType::can_be_master`.
1359        if !self.config.node_type.can_be_master() {
1360            return Err(RepError::InvalidStateTransition(format!(
1361                "node '{}' has type {} which is not electable as master",
1362                self.config.node_name.as_str(),
1363                self.config.node_type,
1364            )));
1365        }
1366
1367        // Ensure we can reach Master state (may need Detached -> Unknown first)
1368        self.ensure_unknown_state()?;
1369
1370        let old_state = self.node_state.get_state();
1371        self.node_state.transition_to(NodeState::Master)?;
1372        self.master_tracker.set_master(self.config.node_name.as_str(), term);
1373
1374        // --- F9: spawn Feeder trackers for each known replica -------------
1375        //
1376        // Closes finding F9 of `docs/src/internal/api-audit-2026-05-rep.md`.
1377        // The architecture is pull-based: replicas pull from the master's
1378        // `PEER_FEEDER` service via `catch_up_from_peer`.  However, the
1379        // master must:
1380        //   1. Track each replica via a `Feeder` so AckTracker bookkeeping
1381        //      can attribute replica acks to the right node.
1382        //   2. Push its own writes into `peer_scanner` so replicas pulling
1383        //      from `PEER_FEEDER` actually receive entries (`replicate_entry`).
1384        //
1385        // Here we ensure step 1: every known electable peer in the group
1386        // gets a `Feeder` entry.
1387        {
1388            let mut feeders = self.feeders.write();
1389            // Drop any stale feeders left over from a prior role.  A
1390            // `Feeder` is just an in-memory tracker; recreating it is
1391            // cheap and avoids state inversion bugs across role changes.
1392            feeders.clear();
1393            for peer in self.group_service.get_all_nodes() {
1394                if peer.name == self.config.node_name {
1395                    continue;
1396                }
1397                if peer.node_type != crate::node_type::NodeType::Electable
1398                    && peer.node_type != crate::node_type::NodeType::Secondary
1399                {
1400                    // Arbiters do not receive log entries.
1401                    continue;
1402                }
1403                feeders.push(Feeder::new(peer.name.clone()));
1404                log::debug!(
1405                    "Node '{}' (master, term={}): registered Feeder for \
1406                     replica '{}'",
1407                    self.config.node_name.as_str(),
1408                    term,
1409                    peer.name,
1410                );
1411            }
1412        }
1413
1414        // For observability, log the count.
1415        log::info!(
1416            "Node '{}' became master for term {} \
1417             (feeder trackers: {} known replicas)",
1418            self.config.node_name.as_str(),
1419            term,
1420            self.feeders.read().len(),
1421        );
1422
1423        // -------------------------------------------------------------------
1424
1425        // Notify listeners
1426        self.notify_listeners(old_state, NodeState::Master);
1427
1428        Ok(())
1429    }
1430
1431    /// Transition to replica state with the given master.
1432    ///
1433    /// Transitions this node to Replica state. The node will receive log
1434    /// entries from the specified master.
1435    ///
1436    /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1437    /// the method prepares an `EnvironmentLogWriter` so that replicated
1438    /// entries can be written to the local log.  The actual network connection
1439    /// is established by the `TcpServiceDispatcher`; this method logs intent.
1440    ///
1441    /// In HA.
1442    pub fn become_replica(&self, master_name: &str) -> Result<()> {
1443        if self.is_shutdown() {
1444            return Err(RepError::StateError(
1445                "Cannot become replica: environment is closed".to_string(),
1446            ));
1447        }
1448
1449        // Ensure we can reach Replica state (may need Detached -> Unknown first)
1450        self.ensure_unknown_state()?;
1451
1452        let old_state = self.node_state.get_state();
1453        self.node_state.transition_to(NodeState::Replica)?;
1454        self.master_tracker.set_master(master_name, 0);
1455        self.replica_stream.set_master(master_name);
1456        self.replica_stream.set_state(
1457            crate::stream::replica_stream::ReplicaStreamState::Connecting,
1458        );
1459
1460        // --- G19: start replica receive loop --------------------------------
1461        //
1462        // Connects to the master's PEER_FEEDER service and runs a
1463        // ReplicaReceiver loop in a background thread.  The receiver writes
1464        // replicated entries via EnvironmentLogWriter.
1465        if let Some(env) = self.env_impl.lock().unwrap().clone() {
1466            if let Some(log_mgr) = env.get_log_manager() {
1467                let vlsn_index =
1468                    Arc::new(crate::vlsn::vlsn_index::VlsnIndex::new(10));
1469
1470                // Resolve the master's socket address from the GroupService.
1471                let master_addr_opt: Option<SocketAddr> = self
1472                    .group_service
1473                    .get_all_nodes()
1474                    .iter()
1475                    .find(|n| n.name == master_name)
1476                    .and_then(|info| {
1477                        format!("{}:{}", info.host, info.port)
1478                            .parse::<SocketAddr>()
1479                            .ok()
1480                    });
1481
1482                let node_name = self.config.node_name.clone();
1483                let master = master_name.to_string();
1484                let vlsn_index_clone = Arc::clone(&vlsn_index);
1485                let shutdown_flag = self.io_shutdown.load(Ordering::SeqCst);
1486                // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
1487                // can call `bootstrap_via_dispatcher` automatically when
1488                // the master signals `NeedsRestore`.  When the env was
1489                // never registered with `init_self_weak` (raw
1490                // `Arc::new(Self::new(...))` without going through
1491                // `open()` or the test harness), the weak ref is `None`
1492                // and we fall back to operator-driven bootstrap.
1493                let self_weak: Option<Weak<Self>> =
1494                    self.self_weak.get().cloned();
1495
1496                let handle = std::thread::Builder::new()
1497                    .name(format!("noxu-replica-{}", node_name))
1498                    .spawn(move || {
1499                        let mut writer = EnvironmentLogWriter::new(
1500                            log_mgr,
1501                            vlsn_index_clone,
1502                        );
1503
1504                        let Some(addr) = master_addr_opt else {
1505                            log::warn!(
1506                                "noxu-replica-{}: master '{}' address not in RepGroup; \
1507                                 waiting for TCP dispatcher connection",
1508                                node_name, master,
1509                            );
1510                            return;
1511                        };
1512
1513                        // Catch-up loop: catch up, observe NeedsRestore,
1514                        // optionally auto-bootstrap, retry once.  We cap
1515                        // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
1516                        // (small) so a misbehaving master does not loop
1517                        // forever consuming network bandwidth.
1518                        const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
1519                        let mut attempts: u32 = 0;
1520                        loop {
1521                            log::info!(
1522                                "noxu-replica-{}: connecting to master '{}' at {}",
1523                                node_name, master, addr,
1524                            );
1525                            match crate::stream::peer_feeder::catch_up_from_peer(
1526                                addr, 0, &mut writer,
1527                            ) {
1528                                Ok(true) => {
1529                                    log::info!(
1530                                        "noxu-replica-{}: catch-up complete from '{}'",
1531                                        node_name, master,
1532                                    );
1533                                    return;
1534                                }
1535                                Ok(false) => {
1536                                    // F2/F4: master signals NeedsRestore.
1537                                    // Wave 9-A fix 2: if a Weak<Self> was
1538                                    // plumbed in, upgrade it and call
1539                                    // `bootstrap_via_dispatcher` ourselves
1540                                    // so the replica auto-bootstraps and
1541                                    // resumes catch-up without operator
1542                                    // intervention.
1543                                    log::warn!(
1544                                        "noxu-replica-{}: master '{}' requires restore",
1545                                        node_name, master,
1546                                    );
1547                                    attempts += 1;
1548                                    if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
1549                                        log::error!(
1550                                            "noxu-replica-{}: exceeded \
1551                                             auto-bootstrap attempts ({}); giving up",
1552                                            node_name,
1553                                            MAX_AUTO_BOOTSTRAP_ATTEMPTS,
1554                                        );
1555                                        return;
1556                                    }
1557                                    let env_arc = match self_weak
1558                                        .as_ref()
1559                                        .and_then(Weak::upgrade)
1560                                    {
1561                                        Some(e) => e,
1562                                        None => {
1563                                            // No back-ref or env dropped:
1564                                            // fall back to operator-driven
1565                                            // bootstrap and exit cleanly.
1566                                            log::warn!(
1567                                                "noxu-replica-{}: no back-reference \
1568                                                 available; operator must call \
1569                                                 bootstrap_via_dispatcher manually",
1570                                                node_name,
1571                                            );
1572                                            return;
1573                                        }
1574                                    };
1575                                    if env_arc.is_shutdown() {
1576                                        return;
1577                                    }
1578                                    log::info!(
1579                                        "noxu-replica-{}: auto-bootstrapping via \
1580                                         dispatcher from '{}' (attempt {})",
1581                                        node_name, master, attempts,
1582                                    );
1583                                    match env_arc
1584                                        .bootstrap_via_dispatcher(&master)
1585                                    {
1586                                        Ok(()) => {
1587                                            log::info!(
1588                                                "noxu-replica-{}: auto-bootstrap \
1589                                                 succeeded; resuming catch-up",
1590                                                node_name,
1591                                            );
1592                                            // Drop the strong ref before
1593                                            // re-entering catch-up so we
1594                                            // do not keep the env alive
1595                                            // longer than necessary.
1596                                            drop(env_arc);
1597                                            continue;
1598                                        }
1599                                        Err(e) => {
1600                                            log::error!(
1601                                                "noxu-replica-{}: auto-bootstrap \
1602                                                 failed: {}",
1603                                                node_name, e,
1604                                            );
1605                                            return;
1606                                        }
1607                                    }
1608                                }
1609                                Err(e) => {
1610                                    if !shutdown_flag {
1611                                        log::error!(
1612                                            "noxu-replica-{}: error from master '{}': {e}",
1613                                            node_name, master,
1614                                        );
1615                                    }
1616                                    return;
1617                                }
1618                            }
1619                        }
1620                    })
1621                    .expect("failed to spawn noxu-replica thread");
1622
1623                self.io_threads.lock().unwrap().push(handle);
1624
1625                log::debug!(
1626                    "Node '{}': replica receive thread started for master '{}'",
1627                    self.config.node_name.as_str(),
1628                    master_name,
1629                );
1630            } else {
1631                log::warn!(
1632                    "Node '{}': no LogManager available (read-only env?); \
1633                     replica I/O loop not started",
1634                    self.config.node_name.as_str(),
1635                );
1636            }
1637        }
1638        // -------------------------------------------------------------------
1639
1640        // Notify listeners
1641        self.notify_listeners(old_state, NodeState::Replica);
1642
1643        log::info!(
1644            "Node '{}' became replica of master '{}'",
1645            self.config.node_name.as_str(),
1646            master_name
1647        );
1648        Ok(())
1649    }
1650
1651    /// Initiate a master transfer to the target node.
1652    ///
1653    ///
1654    ///
1655    /// Transfers the current master state from this node to one of the
1656    /// electable replicas. The replica that is actually chosen to be the new
1657    /// master is the one with which the Master Transfer can be completed most
1658    /// rapidly. The transfer operation ensures that all changes at this node
1659    /// are available at the new master upon conclusion of the operation.
1660    pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
1661        if self.is_shutdown() {
1662            return Err(RepError::StateError(
1663                "Cannot transfer master: environment is closed".to_string(),
1664            ));
1665        }
1666
1667        if !self.is_master() {
1668            return Err(RepError::InvalidState(
1669                "Master transfer can only be initiated on the master node"
1670                    .to_string(),
1671            ));
1672        }
1673
1674        log::info!(
1675            "Node '{}' initiating master transfer to '{}'",
1676            self.config.node_name.as_str(),
1677            config.target_node,
1678        );
1679
1680        // Closes finding F7 of `docs/src/internal/api-audit-2026-05-rep.md`.
1681        //
1682        // Steps:
1683        //   1. Locate the target's address.
1684        //   2. Compute the new term (current observed term + 1).
1685        //   3. Send TRANSFER_MASTER to the target — it will become master.
1686        //   4. Send TRANSFER_MASTER (with the same term + new master name) to
1687        //      every other peer so they re-target.
1688        //   5. Demote self to Replica of the target.
1689        //
1690        // The transfer is best-effort: a peer that doesn't ack is logged
1691        // and skipped.  The election driver will reconcile any divergence
1692        // on the next election round.
1693
1694        let target_addr = self
1695            .group_service
1696            .get_all_nodes()
1697            .into_iter()
1698            .find(|n| n.name == config.target_node)
1699            .and_then(|n| {
1700                format!("{}:{}", n.host, n.port)
1701                    .parse::<std::net::SocketAddr>()
1702                    .ok()
1703            })
1704            .ok_or_else(|| {
1705                RepError::ConfigError(format!(
1706                    "transfer_master: target '{}' not registered or has bad address",
1707                    config.target_node
1708                ))
1709            })?;
1710
1711        let new_term = self.master_tracker.get_term().saturating_add(1);
1712
1713        // 1. Tell the target to become master at the new term.
1714        let target_ack = crate::group_admin::send_transfer_master(
1715            target_addr,
1716            &config.target_node,
1717            new_term,
1718        )
1719        .map_err(|e| {
1720            RepError::NetworkError(format!(
1721                "transfer_master: failed to signal target '{}': {}",
1722                config.target_node, e
1723            ))
1724        })?;
1725        if !target_ack {
1726            return Err(RepError::StateError(format!(
1727                "transfer_master: target '{}' rejected the transfer",
1728                config.target_node
1729            )));
1730        }
1731
1732        // 2. Inform all other peers (best-effort).
1733        for peer in self.group_service.get_all_nodes() {
1734            if peer.name == self.config.node_name
1735                || peer.name == config.target_node
1736            {
1737                continue;
1738            }
1739            if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
1740                let _ = crate::group_admin::send_transfer_master(
1741                    addr,
1742                    &config.target_node,
1743                    new_term,
1744                );
1745            }
1746        }
1747
1748        // 3. Demote self to Replica of the new master.
1749        self.become_replica(&config.target_node)?;
1750
1751        log::info!(
1752            "Node '{}' transferred master to '{}' at term {}",
1753            self.config.node_name.as_str(),
1754            config.target_node,
1755            new_term,
1756        );
1757        Ok(())
1758    }
1759
1760    /// Register a VLSN (as master, after writing a log entry).
1761    ///
1762    /// Maps the given VLSN to the specified log file position. This is called
1763    /// by the master after it writes a replicated log entry.
1764    pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
1765        self.vlsn_index.register(vlsn, file_number, file_offset);
1766    }
1767
1768    /// Replicate a freshly committed log entry from the master.
1769    ///
1770    /// Closes finding F9 of `docs/src/internal/api-audit-2026-05-rep.md`.
1771    ///
1772    /// Combines `register_vlsn` with a push into the in-memory
1773    /// `peer_scanner` so that downstream replicas pulling from this
1774    /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
1775    /// stream the entry without round-tripping through the on-disk
1776    /// log.  The local log is still the source of truth; the peer
1777    /// scanner is a fast-path cache that bounds itself via
1778    /// `PeerLogScanner::with_capacity` so old entries are evicted.
1779    ///
1780    /// Should be called by the master after the local commit has
1781    /// fsynced.  Calling on a non-master is harmless (the peer
1782    /// scanner cache is also used by replicas) but is logged at trace
1783    /// level for diagnostics.
1784    pub fn replicate_entry(
1785        &self,
1786        vlsn: u64,
1787        file_number: u32,
1788        file_offset: u32,
1789        entry_type: u8,
1790        data: Vec<u8>,
1791    ) {
1792        self.vlsn_index.register(vlsn, file_number, file_offset);
1793        self.peer_scanner.push(vlsn, entry_type, data);
1794        if !self.is_master() {
1795            log::trace!(
1796                "replicate_entry called on non-master node '{}': vlsn={}, type={}",
1797                self.config.node_name,
1798                vlsn,
1799                entry_type,
1800            );
1801        }
1802    }
1803
1804    /// Apply a replicated entry (as replica).
1805    ///
1806    /// Applies a log entry received from the master. This is called by the
1807    /// replica stream handler after receiving an entry from the feeder.
1808    ///
1809    /// `data` is the wire-encoded log-record payload.  When the
1810    /// replicated environment has not been wired to a local
1811    /// `noxu_db::Environment` (i.e., before `with_environment` is
1812    /// called) the payload is forwarded into the in-memory peer
1813    /// scanner so that downstream replicas attached to the
1814    /// `PEER_FEEDER` service can re-stream it; the local log is **not**
1815    /// updated.  This is documented behaviour rather than a stub — see
1816    /// `api-audit-2026-05-rep.md` finding #26 (medium) for the
1817    /// `with_environment`-required local-apply path.
1818    /// cleanup (rep info F35: `_data` placeholder) renames the leading
1819    /// underscore so reviewers don't read it as a TODO.
1820    pub fn apply_entry(
1821        &self,
1822        vlsn: u64,
1823        entry_type: u8,
1824        data: Vec<u8>,
1825    ) -> Result<()> {
1826        if self.is_shutdown() {
1827            return Err(RepError::StateError(
1828                "Cannot apply entry: environment is closed".to_string(),
1829            ));
1830        }
1831
1832        // Register the VLSN in the index.
1833        self.vlsn_index.register(vlsn, 0, 0);
1834
1835        // Push into the peer log scanner so downstream replicas can
1836        // receive this entry via the PEER_FEEDER service.
1837        self.peer_scanner.push(vlsn, entry_type, data);
1838
1839        log::trace!(
1840            "Applied replicated entry: vlsn={}, type={}",
1841            vlsn,
1842            entry_type
1843        );
1844        Ok(())
1845    }
1846
1847    /// Record an ack from a replica (as master).
1848    ///
1849    /// Records that the specified replica has acknowledged processing up to
1850    /// the given VLSN. This is used by the master to track durability
1851    /// guarantees.
1852    pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
1853        self.ack_tracker.record_ack(vlsn, replica_name);
1854    }
1855
1856    /// Set the state change listener.
1857    ///
1858    ///
1859    ///
1860    /// Sets the listener used to receive asynchronous replication node state
1861    /// change events. Note that there is one listener per replication node,
1862    /// not one per handle. Invoking this method adds to the set of listeners.
1863    ///
1864    /// Invoking this method typically results in an immediate callback to the
1865    /// application via the `on_state_change` method, so that the application
1866    /// is made aware of the existing state of the node at the time the listener
1867    /// is first established.
1868    pub fn set_state_change_listener(
1869        &self,
1870        listener: Arc<dyn StateChangeListener>,
1871    ) {
1872        // Immediately notify the listener of the current state
1873        let current_state = self.node_state.get_state();
1874        let event = StateChangeEvent::new(
1875            current_state,
1876            current_state,
1877            self.get_master_name(),
1878        );
1879        listener.on_state_change(event);
1880
1881        let mut listeners = self.listeners.write();
1882        listeners.push(listener);
1883    }
1884
1885    /// Close the replicated environment.
1886    ///
1887    ///
1888    ///
1889    /// Closes this handle and releases any resources. When closed, daemon
1890    /// threads are stopped, even if they are performing work. The node ceases
1891    /// participation in the replication group. If the node was currently the
1892    /// master, the rest of the group will hold an election.
1893    ///
1894    /// The ReplicatedEnvironment should not be closed while any other type of
1895    /// handle that refers to it is not yet closed.
1896    pub fn close(&self) -> Result<()> {
1897        if self.shutdown.swap(true, Ordering::SeqCst) {
1898            // Already closed
1899            return Ok(());
1900        }
1901
1902        let old_state = self.node_state.get_state();
1903
1904        // Transition to Shutdown state. The state machine allows this from
1905        // any non-Shutdown state.
1906        let _ = self.node_state.transition_to(NodeState::Shutdown);
1907
1908        // Notify listeners of the shutdown
1909        self.notify_listeners(old_state, NodeState::Shutdown);
1910
1911        // Clear feeders
1912        {
1913            let mut feeders = self.feeders.write();
1914            feeders.clear();
1915        }
1916
1917        // Signal and join all I/O threads spawned by become_master /
1918        // become_replica / start_vlsn_persistence_daemon.  The vlsn-flush
1919        // thread does a final flush on its way out so a clean close is
1920        // recoverable.  Closes finding F11.
1921        self.io_shutdown.store(true, Ordering::SeqCst);
1922        {
1923            let mut threads = self.io_threads.lock().unwrap();
1924            for handle in threads.drain(..) {
1925                let _ = handle.join();
1926            }
1927        }
1928
1929        // Belt-and-braces: even when no daemon is running (e.g.
1930        // `ReplicatedEnvironment::new` without `open`), persist a final
1931        // snapshot if env_home is configured.
1932        if let Some(ref home) = self.config.env_home
1933            && let Err(e) =
1934                crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
1935        {
1936            log::warn!(
1937                "close: failed to persist VLSN index to {}: {}",
1938                home.display(),
1939                e
1940            );
1941        }
1942
1943        // Stop the service dispatcher (the: serviceDispatcher.shutdown()).
1944        if let Some(ref dispatcher) = self.tcp_dispatcher {
1945            dispatcher.stop();
1946            let kind = if dispatcher.is_tls() { "TLS" } else { "TCP" };
1947            log::debug!(
1948                "Node '{}' {} service dispatcher stopped",
1949                self.config.node_name.as_str(),
1950                kind,
1951            );
1952        }
1953
1954        log::info!(
1955            "Replicated environment '{}' in group '{}' closed",
1956            self.config.node_name.as_str(),
1957            self.config.group_name.as_str()
1958        );
1959
1960        Ok(())
1961    }
1962
1963    /// Close this handle and shut down the Replication Group by forcing all
1964    /// active Replicas to exit.
1965    ///
1966    ///
1967    ///
1968    /// This method must be invoked on the node that's currently the Master
1969    /// after all other outstanding handles have been closed. The Master waits
1970    /// for all active Replicas to catch up so that they have a current set of
1971    /// logs, and then shuts them down.
1972    pub fn shutdown_group(
1973        &self,
1974        replica_shutdown_timeout_ms: u64,
1975    ) -> Result<()> {
1976        if !self.is_master() {
1977            return Err(RepError::InvalidState(
1978                "shutdownGroup must be invoked on the master".to_string(),
1979            ));
1980        }
1981
1982        log::info!(
1983            "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
1984            self.config.node_name.as_str(),
1985            self.config.group_name.as_str(),
1986            replica_shutdown_timeout_ms,
1987        );
1988
1989        // Closes finding F8 of `docs/src/internal/api-audit-2026-05-rep.md`.
1990        //
1991        // Send SHUTDOWN_GROUP to every known peer.  The recipient calls
1992        // its own `close()` and the per-connection ADMIN handler
1993        // returns ACK_OK.  Any peer that doesn't ack within the
1994        // timeout is logged and the master proceeds.  After signalling
1995        // every peer, the master closes its own env.
1996        let deadline = std::time::Instant::now()
1997            + Duration::from_millis(replica_shutdown_timeout_ms);
1998
1999        for peer in self.group_service.get_all_nodes() {
2000            if peer.name == self.config.node_name {
2001                continue;
2002            }
2003            // Don't exceed the deadline waiting for any single peer.
2004            let now = std::time::Instant::now();
2005            if now >= deadline {
2006                log::warn!(
2007                    "shutdown_group: deadline reached; skipping remaining peers"
2008                );
2009                break;
2010            }
2011            let addr_str = format!("{}:{}", peer.host, peer.port);
2012            let addr = match addr_str.parse::<SocketAddr>() {
2013                Ok(a) => a,
2014                Err(e) => {
2015                    log::warn!(
2016                        "shutdown_group: peer '{}' has bad address {}: {}",
2017                        peer.name,
2018                        addr_str,
2019                        e
2020                    );
2021                    continue;
2022                }
2023            };
2024            match crate::group_admin::send_shutdown_group(addr) {
2025                Ok(true) => log::info!(
2026                    "shutdown_group: peer '{}' acknowledged",
2027                    peer.name
2028                ),
2029                Ok(false) => log::warn!(
2030                    "shutdown_group: peer '{}' rejected the request",
2031                    peer.name
2032                ),
2033                Err(e) => log::warn!(
2034                    "shutdown_group: peer '{}' unreachable: {}",
2035                    peer.name,
2036                    e
2037                ),
2038            }
2039        }
2040
2041        // Master closes itself last.
2042        self.close()
2043    }
2044
2045    /// Check if shutdown is in progress.
2046    pub fn is_shutdown(&self) -> bool {
2047        self.shutdown.load(Ordering::SeqCst)
2048    }
2049
2050    /// Notify all registered listeners of a state change.
2051    fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
2052        let listeners = self.listeners.read();
2053        if !listeners.is_empty() {
2054            let event = StateChangeEvent::new(
2055                old_state,
2056                new_state,
2057                self.get_master_name(),
2058            );
2059            for listener in listeners.iter() {
2060                listener.on_state_change(event.clone());
2061            }
2062        }
2063    }
2064}
2065
2066// ---------------------------------------------------------------------------
2067// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
2068// ---------------------------------------------------------------------------
2069//
2070// `noxu_db::Transaction::commit_with_durability` calls
2071// `await_replica_acks` after the local WAL fsync.  This impl:
2072//
2073//   1. Rejects calls on a non-master node with `NotMaster`.
2074//   2. Rejects calls during shutdown with `Shutdown`.
2075//   3. Computes the required ack count from `electable_count` and the
2076//      requested policy.
2077//   4. Allocates a unique commit sequence number, registers the ack
2078//      requirement on the `AckTracker`, and polls `is_satisfied` with
2079//      a small sleep until either the timeout elapses or the policy
2080//      is satisfied.
2081//   5. Cleans up the tracker entry on every exit path.
2082//
2083// Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
2084impl ReplicaAckCoordinator for ReplicatedEnvironment {
2085    fn await_replica_acks(
2086        &self,
2087        policy: ReplicaAckPolicyKind,
2088        timeout: Duration,
2089    ) -> std::result::Result<u32, AckWaitError> {
2090        // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
2091        // says callers may already short-circuit, but be defensive.
2092        if matches!(policy, ReplicaAckPolicyKind::None) {
2093            return Ok(0);
2094        }
2095
2096        if self.is_shutdown() {
2097            return Err(AckWaitError {
2098                kind: AckWaitErrorKind::Shutdown,
2099                needed: 0,
2100                received: 0,
2101            });
2102        }
2103
2104        if !self.is_master() {
2105            return Err(AckWaitError {
2106                kind: AckWaitErrorKind::NotMaster,
2107                needed: 0,
2108                received: 0,
2109            });
2110        }
2111
2112        // Count electable peers (excluding the master) using the
2113        // RepGroup view, which counts Arbiters and Electables
2114        // identically. Only Electable nodes are counted as data
2115        // replicas able to ack a commit.  The master itself is
2116        // *implicit*: it is not registered in `group_service` (only
2117        // peers are), so we add 1 to obtain the total electable
2118        // count expected by `ReplicaAckPolicyKind::required_acks`.
2119        let group = self.get_rep_group();
2120        let electable_peers: u32 = group
2121            .get_nodes()
2122            .iter()
2123            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2124            .count() as u32;
2125        let electable_count: u32 = electable_peers + 1; // +1 for self/master
2126
2127        let needed = policy.required_acks(electable_count);
2128        if needed == 0 {
2129            // Single-node group, or All with only the master itself.
2130            return Ok(0);
2131        }
2132
2133        let commit_seq = self
2134            .commit_ack_seq
2135            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2136        self.ack_tracker.register(commit_seq, needed);
2137
2138        // Poll-with-sleep loop. The poll interval is small enough that
2139        // late acks satisfy the policy promptly, and large enough that
2140        // a single commit waiting on a slow replica does not spin a
2141        // CPU.
2142        let poll_interval =
2143            std::cmp::min(timeout / 50, Duration::from_millis(20));
2144        let poll_interval = if poll_interval.is_zero() {
2145            Duration::from_millis(1)
2146        } else {
2147            poll_interval
2148        };
2149        let deadline = std::time::Instant::now() + timeout;
2150
2151        loop {
2152            if self.ack_tracker.is_satisfied(commit_seq) {
2153                self.ack_tracker.cleanup_through(commit_seq);
2154                return Ok(needed);
2155            }
2156            if self.is_shutdown() {
2157                self.ack_tracker.cleanup_through(commit_seq);
2158                return Err(AckWaitError {
2159                    kind: AckWaitErrorKind::Shutdown,
2160                    needed,
2161                    received: 0,
2162                });
2163            }
2164            let now = std::time::Instant::now();
2165            if now >= deadline {
2166                // Tear down the registration so it doesn't accumulate;
2167                // record the partial ack count so the caller can report
2168                // a useful `InsufficientReplicas { required, available }`.
2169                let received =
2170                    self.ack_tracker.received_count(commit_seq).unwrap_or(0);
2171                self.ack_tracker.cleanup_through(commit_seq);
2172                return Err(AckWaitError {
2173                    kind: AckWaitErrorKind::Timeout,
2174                    needed,
2175                    received,
2176                });
2177            }
2178            let sleep_for = std::cmp::min(
2179                poll_interval,
2180                deadline.saturating_duration_since(now),
2181            );
2182            std::thread::sleep(sleep_for);
2183        }
2184    }
2185
2186    /// X-3: allocate the next VLSN for a recovered XA commit and register
2187    /// `lsn` in the VLSN index so feeders can stream the commit.
2188    ///
2189    /// Increments off the current latest VLSN so the new VLSN is strictly
2190    /// monotonically increasing.  In a single-node or master-less environment
2191    /// (not master) returns 0 (NULL_VLSN — harmless, the default).
2192    fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
2193        // Only allocate a VLSN when we are the master; on a replica the
2194        // recovered XA should have been replicated by the original master.
2195        if !self.is_master() {
2196            return 0;
2197        }
2198        let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
2199        self.vlsn_index.register(
2200            next_vlsn,
2201            lsn.file_number(),
2202            lsn.file_offset(),
2203        );
2204        log::debug!(
2205            "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
2206            next_vlsn,
2207            lsn
2208        );
2209        next_vlsn
2210    }
2211
2212    /// R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.
2213    ///
2214    /// The caller writes the `TxnCommit` WAL entry with this VLSN embedded,
2215    /// then calls `register_recovered_commit_vlsn` with the actual commit LSN.
2216    /// This two-step approach ensures the WAL entry carries the VLSN so the
2217    /// X-14 VLSN rebuild on second crash can find it.
2218    fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
2219        if !self.is_master() {
2220            return 0;
2221        }
2222        // Peek at the next VLSN without registering.  The actual registration
2223        // happens in register_recovered_commit_vlsn() after the WAL write.
2224        self.vlsn_index.get_latest_vlsn() + 1
2225    }
2226
2227    /// R-3: register a pre-allocated VLSN in the VLSN index with the actual
2228    /// commit LSN.  Called after writing the `TxnCommit` WAL entry.
2229    fn register_recovered_commit_vlsn(
2230        &self,
2231        vlsn: u64,
2232        commit_lsn: noxu_util::Lsn,
2233    ) {
2234        if vlsn == 0 || !self.is_master() {
2235            return;
2236        }
2237        self.vlsn_index.register(
2238            vlsn,
2239            commit_lsn.file_number(),
2240            commit_lsn.file_offset(),
2241        );
2242        log::debug!(
2243            "register_recovered_commit_vlsn: registered vlsn={} for commit_lsn={:?}",
2244            vlsn,
2245            commit_lsn
2246        );
2247    }
2248}
2249
2250#[cfg(test)]
2251mod tests {
2252    use super::*;
2253    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
2254
2255    /// Helper to create a test config with a fixed port (unit-test style,
2256    /// no real TCP bind needed — hostname "localhost" resolves but the port
2257    /// might be in use; use `test_config_port0` for real TCP tests).
2258    fn test_config(node_name: &str) -> RepConfig {
2259        RepConfig::builder("test_group", node_name, "localhost")
2260            .node_port(5001)
2261            .build()
2262    }
2263
2264    /// Helper to create a test config that binds to an OS-assigned port.
2265    fn test_config_port0(node_name: &str) -> RepConfig {
2266        RepConfig::builder("test_group", node_name, "127.0.0.1")
2267            .node_port(0)
2268            .build()
2269    }
2270
2271    #[test]
2272    fn test_initial_state_is_detached() {
2273        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2274        // NodeStateMachine starts in Detached state
2275        assert_eq!(env.get_state(), NodeState::Detached);
2276        assert!(!env.is_master());
2277        assert!(!env.is_replica());
2278        assert!(!env.is_active());
2279    }
2280
2281    #[test]
2282    fn test_become_master() {
2283        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2284        env.become_master(1).unwrap();
2285        assert_eq!(env.get_state(), NodeState::Master);
2286        assert!(env.is_master());
2287        assert!(!env.is_replica());
2288        assert!(env.is_active());
2289    }
2290
2291    #[test]
2292    fn test_become_replica() {
2293        let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
2294        env.become_replica("node1").unwrap();
2295        assert_eq!(env.get_state(), NodeState::Replica);
2296        assert!(!env.is_master());
2297        assert!(env.is_replica());
2298        assert!(env.is_active());
2299    }
2300
2301    #[test]
2302    fn test_get_node_name() {
2303        let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
2304        assert_eq!(env.get_node_name(), "my_node");
2305    }
2306
2307    #[test]
2308    fn test_get_group_name() {
2309        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2310        assert_eq!(env.get_group_name(), "test_group");
2311    }
2312
2313    #[test]
2314    fn test_register_vlsn_updates_index() {
2315        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2316        env.register_vlsn(1, 0, 100);
2317        env.register_vlsn(2, 0, 200);
2318        env.register_vlsn(3, 0, 300);
2319
2320        assert_eq!(env.get_current_vlsn(), 3);
2321        let range = env.get_vlsn_range();
2322        assert_eq!(range.first(), 1);
2323        assert_eq!(range.last(), 3);
2324    }
2325
2326    #[test]
2327    fn test_record_ack() {
2328        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
2329        env.become_master(1).unwrap();
2330
2331        env.register_vlsn(1, 0, 100);
2332        // Register a pending ack requirement, then record ack
2333        env.get_ack_tracker().register(1, 1);
2334        env.record_ack(1, "replica1");
2335        // Ack should be satisfied
2336        assert!(env.get_ack_tracker().is_satisfied(1));
2337    }
2338
2339    #[test]
2340    fn test_close_sets_shutdown() {
2341        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2342        assert!(!env.is_shutdown());
2343
2344        env.close().unwrap();
2345        assert!(env.is_shutdown());
2346        // After close, state should be Shutdown
2347        assert_eq!(env.get_state(), NodeState::Shutdown);
2348    }
2349
2350    #[test]
2351    fn test_close_is_idempotent() {
2352        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2353        env.close().unwrap();
2354        env.close().unwrap(); // Should not error
2355        assert!(env.is_shutdown());
2356    }
2357
2358    #[test]
2359    fn test_cannot_become_master_when_shutdown() {
2360        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2361        env.close().unwrap();
2362
2363        let result = env.become_master(1);
2364        assert!(result.is_err());
2365    }
2366
2367    #[test]
2368    fn test_cannot_become_replica_when_shutdown() {
2369        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2370        env.close().unwrap();
2371
2372        let result = env.become_replica("master");
2373        assert!(result.is_err());
2374    }
2375
2376    #[test]
2377    fn test_cannot_apply_entry_when_shutdown() {
2378        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2379        env.close().unwrap();
2380
2381        let result = env.apply_entry(1, 0, vec![1, 2, 3]);
2382        assert!(result.is_err());
2383    }
2384
2385    #[test]
2386    fn test_cannot_transfer_master_when_not_master() {
2387        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2388        env.become_replica("other").unwrap();
2389
2390        let config = MasterTransferConfig::new(
2391            "target_node".to_string(),
2392            Duration::from_secs(30),
2393        );
2394        let result = env.transfer_master(config);
2395        assert!(result.is_err());
2396    }
2397
2398    #[test]
2399    fn test_transfer_master_requires_registered_target() {
2400        // F7: transfer_master is no longer a no-op; it sends an ADMIN
2401        // TRANSFER_MASTER signal to the target via TCP.  An unregistered
2402        // target is rejected at the address-resolution step.
2403        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2404        env.become_master(1).unwrap();
2405
2406        let config = MasterTransferConfig::new(
2407            "unknown_target".to_string(),
2408            Duration::from_secs(30),
2409        );
2410        let result = env.transfer_master(config);
2411        assert!(
2412            result.is_err(),
2413            "transfer_master to unregistered target must error"
2414        );
2415    }
2416
2417    #[test]
2418    fn test_apply_entry_registers_vlsn() {
2419        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2420        env.become_replica("master").unwrap();
2421
2422        env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
2423        env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
2424
2425        assert_eq!(env.get_current_vlsn(), 2);
2426    }
2427
2428    #[test]
2429    fn test_master_name_tracking() {
2430        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2431
2432        // Initially no master known
2433        assert!(env.get_master_name().is_none());
2434
2435        // After becoming master, this node is the master
2436        env.become_master(1).unwrap();
2437        assert_eq!(env.get_master_name(), Some("node1".to_string()));
2438    }
2439
2440    #[test]
2441    fn test_master_to_replica_transition() {
2442        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2443
2444        // Become master first
2445        env.become_master(1).unwrap();
2446        assert_eq!(env.get_master_name(), Some("node1".to_string()));
2447
2448        // Transition to replica (Master -> Replica is valid)
2449        env.become_replica("other_master").unwrap();
2450        assert_eq!(env.get_master_name(), Some("other_master".to_string()));
2451        assert!(env.is_replica());
2452    }
2453
2454    #[test]
2455    fn test_state_change_listener_notification() {
2456        struct TestListener {
2457            call_count: AtomicU32,
2458            last_new_state: noxu_sync::Mutex<Option<NodeState>>,
2459        }
2460
2461        impl StateChangeListener for TestListener {
2462            fn on_state_change(&self, event: StateChangeEvent) {
2463                self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
2464                *self.last_new_state.lock() = Some(event.new_state);
2465            }
2466        }
2467
2468        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2469        let listener = Arc::new(TestListener {
2470            call_count: AtomicU32::new(0),
2471            last_new_state: noxu_sync::Mutex::new(None),
2472        });
2473
2474        // Setting the listener should trigger an immediate notification
2475        env.set_state_change_listener(listener.clone());
2476        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
2477
2478        // State change should trigger another notification
2479        env.become_master(1).unwrap();
2480        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
2481        assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
2482    }
2483
2484    #[test]
2485    fn test_close_notifies_listeners() {
2486        struct ShutdownListener {
2487            shutdown_seen: AtomicBool,
2488        }
2489
2490        impl StateChangeListener for ShutdownListener {
2491            fn on_state_change(&self, event: StateChangeEvent) {
2492                if event.new_state == NodeState::Shutdown {
2493                    self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
2494                }
2495            }
2496        }
2497
2498        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2499        let listener = Arc::new(ShutdownListener {
2500            shutdown_seen: AtomicBool::new(false),
2501        });
2502
2503        // The initial notification is for the current (Detached) state
2504        env.set_state_change_listener(listener.clone());
2505
2506        // Become master first so the close transition is meaningful
2507        env.become_master(1).unwrap();
2508        assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
2509
2510        env.close().unwrap();
2511        assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
2512    }
2513
2514    #[test]
2515    fn test_shutdown_group_requires_master() {
2516        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2517        env.become_replica("other").unwrap();
2518
2519        let result = env.shutdown_group(5000);
2520        assert!(result.is_err());
2521    }
2522
2523    #[test]
2524    fn test_shutdown_group_as_master() {
2525        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2526        env.become_master(1).unwrap();
2527
2528        let result = env.shutdown_group(5000);
2529        assert!(result.is_ok());
2530        assert!(env.is_shutdown());
2531    }
2532
2533    #[test]
2534    fn test_get_config() {
2535        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2536        assert_eq!(env.get_config().node_name, "node1");
2537        assert_eq!(env.get_config().group_name, "test_group");
2538    }
2539
2540    #[test]
2541    fn test_get_stats() {
2542        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2543        let _stats = env.get_stats();
2544        // Just verify we can access stats without panicking
2545    }
2546
2547    // -----------------------------------------------------------------------
2548    // TCP dispatcher tests (H-5 / H-7)
2549    // -----------------------------------------------------------------------
2550
2551    #[test]
2552    fn test_tcp_dispatcher_starts_on_new() {
2553        // Use port 0 so the OS assigns an ephemeral port.
2554        let env =
2555            ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
2556        // The dispatcher must have started and bound a real port.
2557        let addr = env.bound_addr();
2558        assert!(addr.is_some(), "expected a bound address");
2559        let addr = addr.unwrap();
2560        assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
2561    }
2562
2563    #[test]
2564    fn test_tcp_dispatcher_stops_on_close() {
2565        let env =
2566            ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
2567        // Dispatcher is running.
2568        assert!(
2569            env.tcp_dispatcher
2570                .as_ref()
2571                .map(|d| d.is_running())
2572                .unwrap_or(false)
2573        );
2574
2575        env.close().unwrap();
2576
2577        // After close, dispatcher must be stopped.
2578        assert!(
2579            !env.tcp_dispatcher
2580                .as_ref()
2581                .map(|d| d.is_running())
2582                .unwrap_or(false),
2583            "dispatcher should be stopped after close"
2584        );
2585    }
2586
2587    #[test]
2588    fn test_tcp_dispatcher_accepts_connection() {
2589        use crate::net::Channel;
2590        use crate::net::ServiceHandler;
2591        use crate::net::service_dispatcher::connect_to_service;
2592        use std::sync::atomic::{AtomicU32, Ordering as AO};
2593        use std::time::Duration;
2594
2595        struct PingHandler {
2596            count: AtomicU32,
2597        }
2598        impl ServiceHandler for PingHandler {
2599            fn service_name(&self) -> &str {
2600                "ping"
2601            }
2602            fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
2603                self.count.fetch_add(1, AO::SeqCst);
2604                // Echo the first message back.
2605                if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
2606                    let _ = ch.send(&msg);
2607                }
2608                Ok(())
2609            }
2610        }
2611
2612        let env =
2613            ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
2614        let addr = env.bound_addr().expect("dispatcher must be bound");
2615
2616        // Register a ping handler on the running dispatcher.
2617        if let Some(ref disp) = env.tcp_dispatcher {
2618            let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
2619            disp.register("ping", handler.clone());
2620
2621            // Give the accept thread a moment.
2622            std::thread::sleep(Duration::from_millis(20));
2623
2624            let client = connect_to_service(addr, "ping").unwrap();
2625            client.send(b"hello").unwrap();
2626            let reply = client.receive(Duration::from_secs(2)).unwrap();
2627            assert_eq!(reply, Some(b"hello".to_vec()));
2628
2629            assert_eq!(handler.count.load(AO::SeqCst), 1);
2630        }
2631
2632        env.close().unwrap();
2633    }
2634
2635    #[test]
2636    fn test_become_master_auto_transitions_from_detached() {
2637        // The state machine requires Detached -> Unknown -> Master.
2638        // become_master() should handle this automatically.
2639        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2640        assert_eq!(env.get_state(), NodeState::Detached);
2641        env.become_master(1).unwrap();
2642        assert_eq!(env.get_state(), NodeState::Master);
2643    }
2644
2645    #[test]
2646    fn test_become_replica_auto_transitions_from_detached() {
2647        // The state machine requires Detached -> Unknown -> Replica.
2648        // become_replica() should handle this automatically.
2649        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2650        assert_eq!(env.get_state(), NodeState::Detached);
2651        env.become_replica("master_node").unwrap();
2652        assert_eq!(env.get_state(), NodeState::Replica);
2653    }
2654
2655    #[test]
2656    fn test_cannot_transfer_master_when_shutdown() {
2657        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2658        env.become_master(1).unwrap();
2659        env.close().unwrap();
2660
2661        let config = MasterTransferConfig::new(
2662            "target".to_string(),
2663            Duration::from_secs(30),
2664        );
2665        let result = env.transfer_master(config);
2666        assert!(result.is_err());
2667    }
2668
2669    #[test]
2670    fn test_full_lifecycle() {
2671        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2672
2673        // Start as detached
2674        assert_eq!(env.get_state(), NodeState::Detached);
2675
2676        // Become master
2677        env.become_master(1).unwrap();
2678        assert!(env.is_master());
2679
2680        // Register some VLSNs
2681        env.register_vlsn(1, 0, 100);
2682        env.register_vlsn(2, 0, 200);
2683
2684        // Record ack from replica
2685        env.record_ack(1, "replica1");
2686        env.record_ack(2, "replica1");
2687
2688        // Transition to replica (simulating failover)
2689        env.become_replica("node2").unwrap();
2690        assert!(env.is_replica());
2691
2692        // Apply entries from new master
2693        env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
2694
2695        // Close
2696        env.close().unwrap();
2697        assert!(env.is_shutdown());
2698    }
2699
2700    /// Verify that `with_environment` lazily registers the RESTORE service on
2701    /// the TCP dispatcher when `config.env_home` was not set at construction.
2702    ///
2703    /// This mirrors`RepNode.envSetup()` which registers the restore handler
2704    /// when the environment is wired into the replicated node.
2705    #[test]
2706    fn test_restore_registered_lazily_via_with_environment() {
2707        use noxu_dbi::EnvironmentImpl;
2708        use tempfile::TempDir;
2709
2710        let dir = TempDir::new().expect("temp dir");
2711
2712        // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
2713        let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
2714            .node_port(0)
2715            .build();
2716
2717        let rep_env = ReplicatedEnvironment::new(config).unwrap();
2718
2719        // Not yet registered.
2720        assert!(
2721            !rep_env
2722                .restore_registered
2723                .load(std::sync::atomic::Ordering::SeqCst)
2724        );
2725
2726        // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
2727        let env_impl = Arc::new(
2728            EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
2729        );
2730        rep_env.with_environment(env_impl);
2731
2732        // Now the RESTORE service must be registered.
2733        assert!(
2734            rep_env
2735                .restore_registered
2736                .load(std::sync::atomic::Ordering::SeqCst)
2737        );
2738    }
2739
2740    /// Verify that when `config.env_home` IS set at construction, the RESTORE
2741    /// service is registered immediately (not deferred).
2742    #[test]
2743    fn test_restore_registered_eagerly_when_env_home_in_config() {
2744        use tempfile::TempDir;
2745
2746        let dir = TempDir::new().expect("temp dir");
2747
2748        let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
2749            .node_port(0)
2750            .env_home(dir.path())
2751            .build();
2752
2753        let rep_env = ReplicatedEnvironment::new(config).unwrap();
2754
2755        // Should be registered immediately (env_home was in config).
2756        assert!(
2757            rep_env
2758                .restore_registered
2759                .load(std::sync::atomic::Ordering::SeqCst)
2760        );
2761    }
2762}