Skip to main content

noxu_rep/
replicated_environment.rs

1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29    AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30    ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43    ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::TcpServiceDispatcher;
50use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
51use crate::network_restore_server::{
52    NetworkRestoreServer, RESTORE_SERVICE_NAME,
53};
54use crate::node_state::{NodeState, NodeStateMachine};
55use crate::rep_config::RepConfig;
56use crate::rep_stats::RepStats;
57use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
58use crate::stream::feeder::Feeder;
59use crate::stream::peer_feeder::{
60    PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
61};
62use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
63use crate::vlsn::vlsn_index::VlsnIndex;
64use crate::vlsn::vlsn_range::VlsnRange;
65
66/// Default heartbeat timeout for master liveness detection.
67const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
68
69/// A replicated database environment.
70///
71///
72///
73/// This is the entry point for replication. It wraps a standard Environment
74/// and adds replication capabilities including master election, replica
75/// streaming, and commit acknowledgments.
76///
77/// High Availability (HA) provides a replicated, embedded database
78/// management system which provides fast, reliable, and scalable data
79/// management. HA enables replication of an environment across a Replication
80/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
81///
82/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
83/// operations are executed in the same fashion in both replicated and
84/// non-replicated applications. A `ReplicatedEnvironment` must be
85/// transactional. All replicated databases created in the replicated
86/// environment must be transactional as well.
87///
88/// A `ReplicatedEnvironment` joins its replication group when it is created.
89/// When `new()` returns, the node will have established contact with the other
90/// members of the group and will be ready to service operations.
91///
92/// Replicated environments can be created with node type Electable or
93/// Secondary. Electable nodes can be masters or replicas, and participate in
94/// both master elections and commit durability decisions. Secondary nodes can
95/// only be replicas, not masters, and do not participate in either elections or
96/// durability decisions.
97///
98/// # Example
99///
100/// ```ignore
101/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
102///
103/// let config = RepConfig::builder("my_group", "node1", "localhost")
104///     .node_port(5001)
105///     .build();
106/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
107/// ```
108pub struct ReplicatedEnvironment {
109    /// The replication configuration for this node.
110    config: RepConfig,
111    /// Tracks the current node state (Detached, Unknown, Master, Replica).
112    node_state: NodeStateMachine,
113    /// Service for managing the replication group membership.
114    group_service: GroupService,
115    /// Maps VLSNs to log file positions.
116    ///
117    /// Wrapped in `Arc` so that background daemons (election driver,
118    /// VLSN-index persistence flusher) can share access without
119    /// borrowing the env.  Closes finding F11 (
120    /// `docs/src/internal/api-audit-2026-05-rep.md`).
121    vlsn_index: Arc<VlsnIndex>,
122    /// Tracks acknowledgments from replicas (used by master).
123    ack_tracker: AckTracker,
124    /// Replication statistics.
125    stats: RepStats,
126    /// Active feeder threads (master -> replica streams).
127    feeders: RwLock<Vec<Feeder>>,
128    /// Replica stream for receiving updates from the master.
129    replica_stream: ReplicaStream,
130    /// Tracks the current master node.
131    master_tracker: MasterTracker,
132    /// State change listeners.
133    listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
134    /// Shutdown flag.
135    shutdown: AtomicBool,
136    /// TCP service dispatcher — listens on the replication port and routes
137    /// incoming connections to the appropriate service handler (feeder, etc.).
138    ///
139    /// Started in `new()` when a listen
140    /// address is available. `None` only when the bind address cannot be
141    /// resolved (e.g. in unit tests that use port 0 but want lazy init).
142    tcp_dispatcher: Option<TcpServiceDispatcher>,
143    /// The address the `tcp_dispatcher` is actually bound to (may differ from
144    /// the configured port when port 0 is used in tests).
145    bound_addr: Option<SocketAddr>,
146
147    /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
148    ///
149    /// When set, `become_master` spawns a `FeederRunner` per replica using
150    /// `EnvironmentLogScanner`, and `become_replica` spawns a
151    /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
152    ///
153    /// In HA.
154    env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
155
156    /// Background I/O thread handles spawned during state transitions.
157    ///
158    /// Stored so that `close()` can join them cleanly.  Each handle is
159    /// `Option` so we can `take()` it in `close()`.
160    io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
161
162    /// Shutdown flag shared with I/O threads so they terminate when the
163    /// environment is closed.
164    io_shutdown: AtomicBool,
165
166    /// Whether the RESTORE service has been registered on the TCP dispatcher.
167    ///
168    /// When `config.env_home` is `None` at construction time, registration is
169    /// deferred until `with_environment()` provides the env home path.
170    restore_registered: AtomicBool,
171
172    /// In-memory log queue used by the peer feeder service.
173    ///
174    /// When this node is a replica, `apply_entry()` pushes each received log
175    /// entry here.  The `PeerFeederService` registered on the TCP dispatcher
176    /// reads from this queue to stream entries to downstream replicas that
177    /// are behind this node (peer-to-peer log distribution, HA style).
178    peer_scanner: Arc<PeerLogScanner>,
179
180    /// Monotonic sequence used by `await_replica_acks` to assign unique
181    /// keys to in-flight commits awaiting replica acknowledgment.  In
182    /// production this should track the real master VLSN; until F11
183    /// closes the VLSN<->commit linkage, the coordinator uses a
184    /// synthetic sequence so that ack tracking is unique per commit.
185    /// See finding F1 in `docs/src/internal/api-audit-2026-05-rep.md`.
186    commit_ack_seq: std::sync::atomic::AtomicU64,
187
188    /// Shared acceptor state used by the ELECTION service handler.
189    /// The election driver updates `own_vlsn` / `own_term` here as the
190    /// node progresses; incoming acceptor sessions read it on every
191    /// connection so their replies always reflect the local node's
192    /// most recent state.  Closes finding F6.
193    election_state: Arc<ElectionAcceptorState>,
194
195    /// Self-referential `Weak` populated once the env has been wrapped
196    /// in an `Arc`.  Used by the replica I/O thread spawned in
197    /// `become_replica` so it can call `bootstrap_via_dispatcher` when
198    /// the master signals `NeedsRestore`.
199    ///
200    /// Populated lazily via [`Self::init_self_weak`] from `open()` and
201    /// the test harness.  When unset (callers that build the env via
202    /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
203    /// the I/O thread falls back to operator-driven bootstrap.
204    self_weak: OnceLock<Weak<Self>>,
205}
206
207impl ReplicatedEnvironment {
208    /// Create a new replicated environment.
209    ///
210    ///
211    ///
212    /// Creates a replicated environment handle and starts participating in the
213    /// replication group. The node's state is determined when it joins the
214    /// group, and mastership is not preconfigured. If the group has no current
215    /// master, creation will trigger an election to determine whether this node
216    /// will participate as a Master or a Replica.
217    ///
218    /// A brand new node will always join an existing group as a Replica, unless
219    /// it is the very first electable node that is creating the group. In that
220    /// case it joins as the Master of the newly formed singleton group.
221    pub fn new(config: RepConfig) -> Result<Self> {
222        let node_state = NodeStateMachine::new();
223        let group_service = GroupService::new(config.group_name.clone());
224        let vlsn_index = {
225            // F11: try to load a previously persisted vlsn.idx from
226            // env_home if one exists.  A successfully loaded index lets a
227            // restarted replica resume from where it left off without a
228            // full network restore; a missing or corrupt file falls back
229            // to a fresh in-memory index (caller will need to bootstrap).
230            if let Some(ref home) = config.env_home {
231                match crate::vlsn::persist::load_from_disk(home) {
232                    Ok(Some(idx)) => {
233                        log::info!(
234                            "Node '{}' loaded persisted VLSN index from {} \
235                             ({} entries, latest vlsn={})",
236                            config.node_name,
237                            home.display(),
238                            idx.snapshot_entries().len(),
239                            idx.get_latest_vlsn(),
240                        );
241                        Arc::new(idx)
242                    }
243                    Ok(None) => Arc::new(VlsnIndex::new(10)),
244                    Err(e) => {
245                        log::warn!(
246                            "Node '{}' failed to load persisted VLSN index \
247                             from {}: {} (treating as fresh node — network \
248                             restore required)",
249                            config.node_name,
250                            home.display(),
251                            e
252                        );
253                        // Best-effort: remove the corrupt file so the
254                        // next persist cycle writes a clean one.  A
255                        // missing file is the "fresh node" baseline.
256                        let _ = std::fs::remove_file(
257                            crate::vlsn::persist::index_path(home),
258                        );
259                        Arc::new(VlsnIndex::new(10))
260                    }
261                }
262            } else {
263                Arc::new(VlsnIndex::new(10))
264            }
265        };
266        let ack_tracker = AckTracker::new();
267        let stats = RepStats::new();
268        let feeders = RwLock::new(Vec::new());
269        let replica_stream = ReplicaStream::new();
270        let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
271
272        // Start the TCP service dispatcher.
273        //
274        // equivalent: `RepImpl.open()` calls `serviceDispatcher.start()`
275        // which binds a ServerSocketChannel on the configured port and begins
276        // accepting connections. We do the same here using the node_host and
277        // node_port from RepConfig.
278        let listen_addr_str =
279            format!("{}:{}", config.node_host, config.node_port);
280        let mut restore_registered_init = false;
281
282        let (tcp_dispatcher, bound_addr) = match listen_addr_str
283            .parse::<SocketAddr>()
284        {
285            Ok(addr) => {
286                match TcpServiceDispatcher::new(addr) {
287                    Ok(dispatcher) => match dispatcher.start() {
288                        Ok(bound) => {
289                            // Register the network restore handler so any
290                            // node in the group can request a full file-set
291                            // copy from this node's environment.
292                            if let Some(ref home) = config.env_home {
293                                let restore_server =
294                                    NetworkRestoreServer::new(home.clone());
295                                dispatcher.register(
296                                    RESTORE_SERVICE_NAME,
297                                    Arc::new(restore_server),
298                                );
299                                log::debug!(
300                                    "Node '{}' RESTORE service registered \
301                                         (env_home={})",
302                                    config.node_name,
303                                    home.display(),
304                                );
305                                restore_registered_init = true;
306                            }
307                            log::info!(
308                                "Node '{}' TCP service dispatcher started on {}",
309                                config.node_name,
310                                bound
311                            );
312                            (Some(dispatcher), Some(bound))
313                        }
314                        Err(e) => {
315                            log::warn!(
316                                "Node '{}' failed to start TCP dispatcher on {}: {}",
317                                config.node_name,
318                                listen_addr_str,
319                                e
320                            );
321                            (None, None)
322                        }
323                    },
324                    Err(e) => {
325                        log::warn!(
326                            "Node '{}' failed to create TCP dispatcher: {}",
327                            config.node_name,
328                            e
329                        );
330                        (None, None)
331                    }
332                }
333            }
334            Err(e) => {
335                log::warn!(
336                    "Node '{}' cannot parse listen address '{}': {}",
337                    config.node_name,
338                    listen_addr_str,
339                    e
340                );
341                (None, None)
342            }
343        };
344
345        // Build the in-memory peer log scanner; register the peer feeder
346        // service on the dispatcher so downstream replicas can connect.
347        let peer_scanner = Arc::new(PeerLogScanner::new());
348        // F5/F31: build the acceptor state with persistence enabled when
349        // env_home is configured.  Crash-durable promises are required
350        // for the Paxos safety invariant after a process restart.
351        let election_state =
352            Arc::new(if let Some(ref home) = config.env_home {
353                ElectionAcceptorState::with_env_home(
354                    config.node_name.clone(),
355                    1,
356                    home,
357                )
358            } else {
359                ElectionAcceptorState::new(config.node_name.clone(), 1)
360            });
361        if let Some(ref dispatcher) = tcp_dispatcher {
362            let service = PeerFeederService::new(Arc::clone(&peer_scanner));
363            dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
364            log::debug!(
365                "Node '{}' PEER_FEEDER service registered",
366                config.node_name,
367            );
368            // F6: register the ELECTION service so peers can run
369            // run_acceptor against this node when proposing.
370            let election_svc =
371                Arc::new(ElectionService::new(Arc::clone(&election_state)));
372            dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
373            log::debug!(
374                "Node '{}' ELECTION service registered",
375                config.node_name,
376            );
377        }
378
379        let env = Self {
380            config,
381            node_state,
382            group_service,
383            vlsn_index,
384            ack_tracker,
385            stats,
386            feeders,
387            replica_stream,
388            master_tracker,
389            listeners: RwLock::new(Vec::new()),
390            shutdown: AtomicBool::new(false),
391            tcp_dispatcher,
392            bound_addr,
393            env_impl: StdMutex::new(None),
394            io_threads: StdMutex::new(Vec::new()),
395            io_shutdown: AtomicBool::new(false),
396            restore_registered: AtomicBool::new(restore_registered_init),
397            peer_scanner,
398            commit_ack_seq: std::sync::atomic::AtomicU64::new(1),
399            election_state,
400            self_weak: OnceLock::new(),
401        };
402
403        Ok(env)
404    }
405
406    /// Open a replicated environment with the standard production
407    /// lifecycle.
408    ///
409    /// This is the entry point recommended by the mdBook chapters:
410    /// it allocates the `ReplicatedEnvironment`, registers all
411    /// services on the TCP dispatcher, and spawns the **election
412    /// driver** background thread that runs Paxos rounds against
413    /// known peers until the node has resolved into either Master or
414    /// Replica state.
415    ///
416    /// Closes finding F6 of `docs/src/internal/api-audit-2026-05-rep.md`.
417    ///
418    /// Use [`ReplicatedEnvironment::new`] directly only when the
419    /// caller plans to drive state transitions explicitly (test
420    /// harnesses, scripted bootstrap, recovery tooling).
421    pub fn open(config: RepConfig) -> Result<Arc<Self>> {
422        let env = Arc::new(Self::new(config)?);
423        env.init_self_weak();
424        env.start_election_driver();
425        env.start_vlsn_persistence_daemon();
426        env.register_admin_service();
427        Ok(env)
428    }
429
430    /// Populate the env's self-referential `Weak` so background
431    /// threads can obtain a back-reference for auto-orchestrated
432    /// follow-up actions (e.g. replica auto-bootstrap on
433    /// `NeedsRestore`).  Idempotent: subsequent calls are silent
434    /// no-ops because the inner [`OnceLock`] only accepts one set.
435    ///
436    /// Callers that wrap the env in `Arc` and want auto-bootstrap
437    /// behaviour should call this immediately after construction.
438    /// `Self::open` already does so.  Test harnesses that drive
439    /// transitions manually (`RepTestBase`) also call this so the
440    /// auto-bootstrap path is exercised in tests.
441    pub fn init_self_weak(self: &Arc<Self>) {
442        let _ = self.self_weak.set(Arc::downgrade(self));
443    }
444
445    /// Register the `ADMIN` service handler on the TCP dispatcher.
446    ///
447    /// Closes findings F7 / F8.  Holds a `Weak<Self>` so the handler
448    /// does not extend the env's lifetime.  Idempotent: re-registering
449    /// is harmless because `TcpServiceDispatcher::register` overwrites
450    /// the existing handler.
451    pub fn register_admin_service(self: &Arc<Self>) {
452        if let Some(ref dispatcher) = self.tcp_dispatcher {
453            crate::group_admin::register_admin_service(
454                dispatcher,
455                Arc::downgrade(self),
456            );
457            log::debug!(
458                "Node '{}' ADMIN service registered",
459                self.config.node_name,
460            );
461        }
462    }
463
464    /// Spawn the VLSN-index persistence daemon (F11).
465    ///
466    /// Periodically (every 2 seconds) snapshots the in-memory
467    /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
468    /// resume from where the replica left off without a full network
469    /// restore.  No-op when `config.env_home` is `None`.
470    ///
471    /// Idempotent: only one daemon is ever spawned per env.
472    pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
473        let Some(home) = self.config.env_home.clone() else {
474            return;
475        };
476        {
477            let threads = self.io_threads.lock().unwrap();
478            if threads.iter().any(|h| {
479                h.thread()
480                    .name()
481                    .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
482            }) {
483                return;
484            }
485        }
486
487        let vlsn_index = Arc::clone(&self.vlsn_index);
488        let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
489        let me = Arc::clone(self);
490        let interval = Duration::from_secs(2);
491
492        let handle = std::thread::Builder::new()
493            .name(name)
494            .spawn(move || {
495                use std::sync::atomic::Ordering;
496                let mut last_persisted_vlsn: u64 = 0;
497                while !me.io_shutdown.load(Ordering::SeqCst)
498                    && !me.is_shutdown()
499                {
500                    std::thread::sleep(interval);
501                    if me.io_shutdown.load(Ordering::SeqCst) {
502                        break;
503                    }
504                    let latest = vlsn_index.get_latest_vlsn();
505                    if latest == last_persisted_vlsn {
506                        // Nothing new to flush.
507                        continue;
508                    }
509                    // X-2: cap the flush at the last durable checkpoint's
510                    // end LSN so the persisted VLSN index never claims
511                    // VLSNs beyond the durable B-tree state.  After a crash
512                    // the recovered tree and the index will be coherent.
513                    let cap_lsn = me
514                        .env_impl
515                        .lock()
516                        .unwrap()
517                        .as_ref()
518                        .and_then(|e| e.get_checkpointer())
519                        .map(|c| c.get_last_checkpoint_end())
520                        .unwrap_or(noxu_util::NULL_LSN);
521                    match crate::vlsn::persist::flush_to_disk_capped(
522                        &vlsn_index,
523                        &home,
524                        cap_lsn,
525                    ) {
526                        Ok(n) => {
527                            log::trace!(
528                                "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
529                                n,
530                                latest,
531                                cap_lsn,
532                            );
533                            last_persisted_vlsn = latest;
534                        }
535                        Err(e) => {
536                            log::warn!(
537                                "vlsn-flush: failed to persist VLSN index to {}: {}",
538                                home.display(),
539                                e
540                            );
541                        }
542                    }
543                }
544                // Final flush on shutdown so a clean close is recoverable.
545                // Cap at the last checkpoint even for the shutdown flush.
546                let cap_lsn = me
547                    .env_impl
548                    .lock()
549                    .unwrap()
550                    .as_ref()
551                    .and_then(|e| e.get_checkpointer())
552                    .map(|c| c.get_last_checkpoint_end())
553                    .unwrap_or(noxu_util::NULL_LSN);
554                if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
555                    &vlsn_index,
556                    &home,
557                    cap_lsn,
558                ) {
559                    log::warn!(
560                        "vlsn-flush (final): failed to persist VLSN index: {}",
561                        e
562                    );
563                }
564            })
565            .expect("failed to spawn noxu-vlsn-flush thread");
566
567        self.io_threads.lock().unwrap().push(handle);
568        log::debug!(
569            "Node '{}' VLSN persistence daemon started",
570            self.config.node_name,
571        );
572    }
573
574    /// Spawn the election driver background thread.
575    ///
576    /// While the env is in `Detached` or `Unknown` state and no master
577    /// is known, the driver periodically attempts a Paxos election
578    /// against peers in `GroupService` (whose ELECTION services were
579    /// registered at `open()` time).  On success the driver calls
580    /// `become_master` (if this node is the winner) or `become_replica`
581    /// (otherwise).  On failure (no quorum), the driver waits
582    /// `config.election_timeout` and tries again.
583    ///
584    /// The driver respects `io_shutdown`; on env close the loop exits
585    /// promptly.
586    ///
587    /// Idempotent: a second call is a no-op (only one driver thread is
588    /// ever spawned per env).
589    pub fn start_election_driver(self: &Arc<Self>) {
590        use std::sync::atomic::Ordering;
591        // Reuse io_shutdown for cancellation; a successful spawn is
592        // recorded by appending to io_threads, so a duplicate call
593        // would re-add a thread — we use a one-shot `AtomicBool`
594        // sentinel placed in the io_shutdown's slot via a new field.
595        // Cheaper: a static name check on io_threads is impossible;
596        // instead, gate spawning on whether any io_thread already
597        // carries the driver name.
598        {
599            let threads = self.io_threads.lock().unwrap();
600            if threads.iter().any(|h| {
601                h.thread()
602                    .name()
603                    .is_some_and(|n| n.starts_with("noxu-election-"))
604            }) {
605                return;
606            }
607        }
608
609        let me = Arc::clone(self);
610        let name = format!("noxu-election-{}", self.config.node_name);
611        let handle = std::thread::Builder::new()
612            .name(name)
613            .spawn(move || {
614                me.run_election_loop();
615            })
616            .expect("failed to spawn election driver thread");
617        self.io_threads.lock().unwrap().push(handle);
618        log::debug!("Node '{}' election driver started", self.config.node_name,);
619        // Keep ordering sane on the io_shutdown flag.
620        let _ = self.io_shutdown.load(Ordering::SeqCst);
621    }
622
623    /// Body of the election driver loop.  Public only for tests; called
624    /// by [`Self::start_election_driver`].
625    fn run_election_loop(self: Arc<Self>) {
626        use std::sync::atomic::Ordering;
627        // Maintain an internal monotonically increasing election term.
628        // Each successful or failed round bumps the term so retries do
629        // not collide with stale acceptor promises.
630        let mut term: u64 = 1;
631
632        loop {
633            if self.io_shutdown.load(Ordering::SeqCst) {
634                return;
635            }
636            if self.is_shutdown() {
637                return;
638            }
639
640            let state = self.node_state.get_state();
641            // Stop driving once we've resolved into Master/Replica;
642            // re-arm only if the node returns to Unknown.
643            if matches!(state, NodeState::Master | NodeState::Replica) {
644                std::thread::sleep(Duration::from_millis(200));
645                continue;
646            }
647            if matches!(state, NodeState::Shutdown) {
648                return;
649            }
650
651            // Probe peers for an active master via the existing
652            // GroupService cache.  In the absence of a heartbeat path
653            // we rely on master_tracker (set by become_replica from
654            // the receive loop).
655            if let Some(master_name) = self.master_tracker.get_master()
656                && master_name != self.config.node_name
657            {
658                let _ = self.become_replica(&master_name);
659                continue;
660            }
661
662            // Snapshot peers to dial for ELECTION.
663            let peers: Vec<(String, SocketAddr)> = self
664                .group_service
665                .get_all_nodes()
666                .into_iter()
667                .filter(|n| n.name != self.config.node_name)
668                .filter_map(|n| {
669                    format!("{}:{}", n.host, n.port)
670                        .parse::<SocketAddr>()
671                        .ok()
672                        .map(|a| (n.name, a))
673                })
674                .collect();
675
676            // Build the local rep group view used by run_election to
677            // compute quorum and resolve the winner name.  Include
678            // self.
679            let group = self.local_rep_group_with_self();
680
681            // Update election state for any concurrent acceptor calls.
682            let our_vlsn = self.vlsn_index.get_latest_vlsn();
683            self.election_state.set_vlsn(our_vlsn);
684            self.election_state.set_term(term);
685
686            // Connect to each peer's ELECTION service.  Failures are
687            // tolerated: a peer that doesn't answer simply contributes
688            // no vote.  The election may still reach quorum in the
689            // remaining peers.
690            let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
691                Vec::new();
692            for (peer_name, addr) in &peers {
693                match crate::net::service_dispatcher::connect_to_service(
694                    *addr,
695                    ELECTION_SERVICE_NAME,
696                ) {
697                    Ok(ch) => {
698                        let arc: Arc<dyn crate::net::channel::Channel> =
699                            Arc::new(ch);
700                        channels.push(arc);
701                    }
702                    Err(e) => {
703                        log::trace!(
704                            "election driver: peer {} ({}) unreachable: {}",
705                            peer_name,
706                            addr,
707                            e
708                        );
709                    }
710                }
711            }
712
713            // Resolve our own node_id from the group; if not present
714            // we cannot run an election (closed-world guard — see F22).
715            let self_node_id =
716                group.get_node(&self.config.node_name).map(|n| n.node_id());
717            let self_node_id = match self_node_id {
718                Some(id) => id,
719                None => {
720                    log::warn!(
721                        "election driver: node '{}' not registered in \
722                         own group view; sleeping",
723                        self.config.node_name
724                    );
725                    std::thread::sleep(Duration::from_millis(200));
726                    continue;
727                }
728            };
729
730            log::debug!(
731                "election driver on '{}': starting term={} with {} peers",
732                self.config.node_name,
733                term,
734                channels.len(),
735            );
736            let outcome = crate::elections::paxos::run_election(
737                self_node_id,
738                &self.config.node_name,
739                &group,
740                &channels,
741                our_vlsn,
742                /* priority */ 1,
743                term,
744            );
745
746            match outcome {
747                Some(winner_id) if winner_id == self_node_id => {
748                    if let Err(e) = self.become_master(term) {
749                        log::warn!(
750                            "election driver: become_master failed: {}",
751                            e
752                        );
753                    } else {
754                        log::info!(
755                            "election driver: '{}' became master at term {}",
756                            self.config.node_name,
757                            term,
758                        );
759                    }
760                }
761                Some(winner_id) => {
762                    if let Some(winner_node) = group
763                        .get_nodes()
764                        .into_iter()
765                        .find(|n| n.node_id() == winner_id)
766                    {
767                        if let Err(e) = self.become_replica(&winner_node.name) {
768                            log::warn!(
769                                "election driver: become_replica failed: {}",
770                                e
771                            );
772                        } else {
773                            log::info!(
774                                "election driver: '{}' became replica of '{}' at term {}",
775                                self.config.node_name,
776                                winner_node.name,
777                                term,
778                            );
779                        }
780                    }
781                }
782                None => {
783                    log::debug!(
784                        "election driver on '{}' term={}: no quorum",
785                        self.config.node_name,
786                        term,
787                    );
788                }
789            }
790
791            term = term.saturating_add(1);
792            // Back off so we don't pin the loop on transient failures.
793            std::thread::sleep(
794                self.config.election_timeout.min(Duration::from_millis(500)),
795            );
796        }
797    }
798
799    /// Internal: a `RepGroup` snapshot that includes self.
800    fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
801        let mut group = self.get_rep_group();
802        // Ensure self is present in the group view; the
803        // group_service does not auto-register the local node.
804        if group.get_node(&self.config.node_name).is_none() {
805            let mut self_node = crate::rep_node::RepNode::new(
806                self.config.node_name.clone(),
807                self.config.node_type,
808                self.config.node_host.clone(),
809                self.config.node_port,
810                /* node_id */ 0,
811            );
812            // Stable self node_id derived from the name hash so
813            // re-creations in the same process don't collide.
814            use std::hash::{Hash, Hasher};
815            let mut hasher = std::collections::hash_map::DefaultHasher::new();
816            self.config.node_name.hash(&mut hasher);
817            // Restrict to a u32 range and avoid 0 (reserved for
818            // "unknown").
819            let id = ((hasher.finish() as u32) | 1).max(1);
820            self_node.node_id = id;
821            group.add_node(self_node);
822        }
823        group
824    }
825
826    /// Return the socket address the TCP service dispatcher is bound to.
827    ///
828    /// This may differ from the configured `node_port` when port 0 is used
829    /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
830    /// could not be started (e.g. the address is not resolvable).
831    pub fn bound_addr(&self) -> Option<SocketAddr> {
832        self.bound_addr
833    }
834
835    /// Wire a live `EnvironmentImpl` into this replicated environment.
836    ///
837    /// After this call, state transitions (`become_master`, `become_replica`)
838    /// will spawn real feeder/receiver I/O threads backed by the live log.
839    ///
840    /// If the RESTORE service was not registered at construction time (because
841    /// `config.env_home` was `None`), it is registered here using the
842    /// environment's actual home path.  This mirrors`RepNode.envSetup()`
843    /// which registers the restore handler during environment wiring.
844    ///
845    /// Environment reference wiring.
846    /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
847    pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
848        // Register RESTORE service lazily if not already done.
849        if !self.restore_registered.load(Ordering::SeqCst)
850            && let Some(ref dispatcher) = self.tcp_dispatcher
851        {
852            let env_home = env.get_env_home().to_path_buf();
853            let restore_server = NetworkRestoreServer::new(env_home.clone());
854            dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
855            self.restore_registered.store(true, Ordering::SeqCst);
856            log::debug!(
857                "Node '{}' RESTORE service registered via with_environment \
858                 (env_home={})",
859                self.config.node_name,
860                env_home.display(),
861            );
862        }
863
864        // X-14: rebuild the VLSN index from recovery-replayed LN entries.
865        // After a crash the on-disk vlsn.idx may be stale (either ahead of
866        // the recovered B-tree, or behind if vlsn.idx was not flushed
867        // after the last checkpoint).  Re-registering all (vlsn, lsn) pairs
868        // from the redo pass gives a consistent in-memory index.
869        if !env.recovery_vlsns.is_empty() {
870            log::info!(
871                "Node '{}': rebuilding VLSN index from {} recovered entries",
872                self.config.node_name,
873                env.recovery_vlsns.len(),
874            );
875            for &(vlsn, lsn_u64) in &env.recovery_vlsns {
876                let lsn = noxu_util::Lsn::from_u64(lsn_u64);
877                self.vlsn_index.register(
878                    vlsn,
879                    lsn.file_number(),
880                    lsn.file_offset(),
881                );
882            }
883        }
884
885        // X-1: truncate the VLSN index to the rollback matchpoint if recovery
886        // detected a completed rollback period.  The matchpoint is the highest
887        // LSN that is still valid after the rollback; entries with higher VLSNs
888        // correspond to data that was rolled back and must not appear in the
889        // index.
890        if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
891            // Find the latest VLSN whose LSN is at or before the matchpoint.
892            // Scan the recovered VLSN pairs (sorted ascending) to find the
893            // boundary.
894            let safe_vlsn = env
895                .recovery_vlsns
896                .iter()
897                .rev()
898                .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
899                .map(|&(vlsn, _)| vlsn)
900                .unwrap_or(0);
901            log::info!(
902                "Node '{}': truncating VLSN index after vlsn={} \
903                 (rollback matchpoint lsn={:#x})",
904                self.config.node_name,
905                safe_vlsn,
906                matchpoint_lsn_u64,
907            );
908            self.vlsn_index.truncate_after(safe_vlsn);
909        }
910
911        *self.env_impl.lock().unwrap() = Some(env);
912    }
913
914    /// Get the current node state.
915    ///
916    ///
917    ///
918    /// Returns the current state of the node associated with this replication
919    /// environment. If the caller's intent is to track the state of the node,
920    /// `StateChangeListener` may be a more convenient and efficient approach.
921    pub fn get_state(&self) -> NodeState {
922        self.node_state.get_state()
923    }
924
925    /// Check if this node is the master.
926    ///
927    /// Returns true if the node's current state is Master.
928    pub fn is_master(&self) -> bool {
929        self.node_state.get_state() == NodeState::Master
930    }
931
932    /// Check if this node is a replica.
933    ///
934    /// Returns true if the node's current state is Replica.
935    pub fn is_replica(&self) -> bool {
936        self.node_state.get_state() == NodeState::Replica
937    }
938
939    /// Returns true if the node is currently participating in the group
940    /// as a Replica or a Master.
941    pub fn is_active(&self) -> bool {
942        self.node_state.get_state().is_active()
943    }
944
945    /// Get the node name.
946    ///
947    ///
948    ///
949    /// Returns the unique name used to identify this replicated environment.
950    pub fn get_node_name(&self) -> &str {
951        self.config.node_name.as_str()
952    }
953
954    /// Get the group name.
955    ///
956    /// Returns the name of the replication group this node belongs to.
957    pub fn get_group_name(&self) -> &str {
958        self.config.group_name.as_str()
959    }
960
961    /// Get the current master (if known).
962    ///
963    /// Returns the name of the node that is currently the master, or None
964    /// if the master is not known (e.g. the node is in the Unknown or
965    /// Detached state).
966    pub fn get_master_name(&self) -> Option<String> {
967        self.master_tracker.get_master()
968    }
969
970    /// Get the replication group info.
971    ///
972    ///
973    ///
974    /// Returns a description of the replication group as known by this node.
975    /// The replicated group metadata is stored in a replicated database and
976    /// updates are propagated by the current master node to all replicas. If
977    /// this node is not the master, it is possible for its description of the
978    /// group to be out of date.
979    pub fn get_group(&self) -> &GroupService {
980        &self.group_service
981    }
982
983    /// Add a peer node to the replication group at runtime.
984    ///
985    /// The node is registered in the `GroupService` so elections and quorum
986    /// calculations immediately reflect the new membership.
987    pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
988        use crate::group_service::NodeInfo;
989        use std::time::Instant;
990
991        let info = NodeInfo {
992            name: node.name.clone(),
993            node_type: node.node_type,
994            host: node.host.clone(),
995            port: node.port,
996            node_id: node.node_id,
997            joined_at: Instant::now(),
998            last_seen: Instant::now(),
999            is_active: true,
1000            known_vlsn: 0,
1001            log_range: None,
1002            read_capacity_pct: node.read_capacity_pct,
1003            write_capacity_pct: node.write_capacity_pct,
1004            latency_hint_ms: node.latency_hint_ms,
1005        };
1006        self.group_service.add_node(info)?;
1007        log::info!(
1008            "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1009            self.config.node_name,
1010            node.name,
1011            node.host,
1012            node.port,
1013            self.config.group_name,
1014        );
1015
1016        // F9: if we are the current master, immediately register a
1017        // `Feeder` tracker for the new peer so AckTracker bookkeeping
1018        // and downstream pull-based streaming work without a forced
1019        // re-election.
1020        if self.is_master()
1021            && (node.node_type == crate::node_type::NodeType::Electable
1022                || node.node_type == crate::node_type::NodeType::Secondary)
1023        {
1024            let mut feeders = self.feeders.write();
1025            if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1026                feeders.push(Feeder::new(node.name.clone()));
1027                log::debug!(
1028                    "Node '{}' (master): dispatched Feeder for new peer '{}'",
1029                    self.config.node_name,
1030                    node.name,
1031                );
1032            }
1033        }
1034        Ok(())
1035    }
1036
1037    /// Remove a peer node from the replication group by name.
1038    ///
1039    /// The node is deregistered from the `GroupService`.  Elections initiated
1040    /// after this call will not include the removed node in quorum calculations.
1041    pub fn remove_peer(&self, name: &str) -> Result<()> {
1042        self.group_service.remove_node(name)?;
1043        log::info!(
1044            "Node '{}': removed peer '{}' from group '{}'",
1045            self.config.node_name,
1046            name,
1047            self.config.group_name,
1048        );
1049        Ok(())
1050    }
1051
1052    /// Update the capacity and latency metadata of an existing peer.
1053    ///
1054    /// Only the following fields are updated from `node`:
1055    ///   - `read_capacity_pct`
1056    ///   - `write_capacity_pct`
1057    ///   - `latency_hint_ms`
1058    ///
1059    /// The node's identity (name, address, port, node_type) is preserved.
1060    /// Safe to call while replication is active.
1061    ///
1062    /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1063    /// is rebuilt to reflect the new capacity/latency weights.
1064    ///
1065    /// # Note
1066    ///
1067    /// `update_peer_metadata` does not currently re-run
1068    /// `QuorumPolicy::validate(electable_count)` after the metadata
1069    /// change.  An LP-optimal `Expression` quorum that was safe before
1070    /// the update may no longer satisfy the intersection property
1071    /// afterwards.  Until automatic revalidation lands, deployments
1072    /// using `QuorumPolicy::Expression` should call
1073    /// `quorum_policy().validate(get_rep_group().electable_count())`
1074    /// on the returned `RepGroup` after every metadata change and
1075    /// fail the operator-facing operation if validation reports
1076    /// unsafety.
1077    pub fn update_peer_metadata(
1078        &self,
1079        name: &str,
1080        node: crate::rep_node::RepNode,
1081    ) -> Result<()> {
1082        self.group_service.update_node_metadata(
1083            name,
1084            node.read_capacity_pct,
1085            node.write_capacity_pct,
1086            node.latency_hint_ms,
1087        )?;
1088        log::info!(
1089            "Node '{}': updated metadata for peer '{}' \
1090             (read_cap={}, write_cap={}, latency={}ms)",
1091            self.config.node_name,
1092            name,
1093            node.read_capacity_pct,
1094            node.write_capacity_pct,
1095            node.latency_hint_ms,
1096        );
1097        Ok(())
1098    }
1099
1100    /// Returns a snapshot of the current replication group as a `RepGroup`.
1101    ///
1102    /// The snapshot reflects the state at the time of the call; subsequent
1103    /// `add_peer` / `remove_peer` calls are not reflected in it.
1104    pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1105        use crate::rep_group::RepGroup;
1106
1107        let mut group = RepGroup::new(
1108            self.config.group_name.clone(),
1109            self.group_service.get_group_id(),
1110        );
1111        for info in self.group_service.get_all_nodes() {
1112            let mut node = crate::rep_node::RepNode::new(
1113                info.name.clone(),
1114                info.node_type,
1115                info.host.clone(),
1116                info.port,
1117                info.node_id,
1118            );
1119            node.read_capacity_pct = info.read_capacity_pct;
1120            node.write_capacity_pct = info.write_capacity_pct;
1121            node.latency_hint_ms = info.latency_hint_ms;
1122            group.add_node(node);
1123        }
1124        group
1125    }
1126
1127    /// Get the replication configuration.
1128    ///
1129    ///
1130    ///
1131    /// Returns the replication configuration that has been used to create this
1132    /// environment.
1133    pub fn get_config(&self) -> &RepConfig {
1134        &self.config
1135    }
1136
1137    /// Get the current VLSN range on this node.
1138    ///
1139    /// Returns the range of VLSNs currently available on this node.
1140    pub fn get_vlsn_range(&self) -> VlsnRange {
1141        self.vlsn_index.get_range()
1142    }
1143
1144    /// Get the latest VLSN.
1145    ///
1146    /// Returns the most recent VLSN registered on this node.
1147    pub fn get_current_vlsn(&self) -> u64 {
1148        self.vlsn_index.get_latest_vlsn()
1149    }
1150
1151    /// Return the list of replica names that currently have a `Feeder`
1152    /// tracker on this (master) node.
1153    ///
1154    /// Used by tests and operator tooling.  The returned list reflects
1155    /// the master's view at the time of the call; subsequent
1156    /// `add_peer`/`remove_peer` calls may change it.
1157    pub fn feeder_replica_names(&self) -> Vec<String> {
1158        self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1159    }
1160
1161    /// Bootstrap this node's environment by network-restoring all `.ndb`
1162    /// files from `peer_name` via the dispatcher's RESTORE service.
1163    ///
1164    /// Closes findings F2 / F4 of `docs/src/internal/api-audit-2026-05-rep.md`.
1165    ///
1166    /// The standalone `NetworkRestore::execute()` opens raw TCP and
1167    /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1168    /// Production replicated environments host the RESTORE handler on the
1169    /// dispatcher, so this method routes through `execute_via_dispatcher`.
1170    ///
1171    /// `peer_name` must be a known peer in `GroupService`; on success the
1172    /// peer's `.ndb` files are written into `config.env_home`.  Returns
1173    /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1174    /// fails for any reason.
1175    pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1176        let env_home = self.config.env_home.clone().ok_or_else(|| {
1177            RepError::ConfigError(
1178                "bootstrap_via_dispatcher requires env_home in RepConfig"
1179                    .into(),
1180            )
1181        })?;
1182        let peer_info = self
1183            .group_service
1184            .get_all_nodes()
1185            .into_iter()
1186            .find(|n| n.name == peer_name)
1187            .ok_or_else(|| {
1188                RepError::ConfigError(format!(
1189                    "peer '{}' not registered in group '{}'",
1190                    peer_name, self.config.group_name,
1191                ))
1192            })?;
1193
1194        let cfg = NetworkRestoreConfig {
1195            source_node: peer_info.name.clone(),
1196            source_host: peer_info.host.clone(),
1197            source_port: peer_info.port,
1198            retain_log_files: true,
1199        };
1200        let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1201        restore.execute_via_dispatcher()?;
1202        log::info!(
1203            "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1204            self.config.node_name,
1205            peer_info.name,
1206            peer_info.host,
1207            peer_info.port,
1208        );
1209        Ok(())
1210    }
1211
1212    /// Get replication statistics.
1213    ///
1214    ///
1215    ///
1216    /// Returns statistics associated with this environment.
1217    pub fn get_stats(&self) -> &RepStats {
1218        &self.stats
1219    }
1220
1221    /// Get the ack tracker.
1222    pub fn get_ack_tracker(&self) -> &AckTracker {
1223        &self.ack_tracker
1224    }
1225
1226    /// Ensure the node state machine is in Unknown state, transitioning
1227    /// from Detached if necessary. This is needed because the state machine
1228    /// only allows Detached -> Unknown -> Master/Replica.
1229    pub fn ensure_unknown_state(&self) -> Result<()> {
1230        let current = self.node_state.get_state();
1231        match current {
1232            NodeState::Unknown => Ok(()),
1233            NodeState::Detached => {
1234                self.node_state.transition_to(NodeState::Unknown)?;
1235                Ok(())
1236            }
1237            // Master and Replica must transition through Unknown before
1238            // joining a new group or reconnecting.
1239            NodeState::Master | NodeState::Replica => {
1240                self.node_state.transition_to(NodeState::Unknown)?;
1241                Ok(())
1242            }
1243            NodeState::Shutdown => {
1244                Err(RepError::StateError("Node is shut down".to_string()))
1245            }
1246        }
1247    }
1248
1249    /// Transition to master state.
1250    ///
1251    /// Transitions this node to Master state for the given election term.
1252    /// As master, the node can accept write operations and feed log entries
1253    /// to replicas.
1254    ///
1255    /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1256    /// a `FeederRunner` + `EnvironmentLogScanner` background thread is spawned
1257    /// for each currently-registered replica (feeder entries in `feeders`).
1258    ///
1259    /// In HA.
1260    pub fn become_master(&self, term: u64) -> Result<()> {
1261        if self.is_shutdown() {
1262            return Err(RepError::StateError(
1263                "Cannot become master: environment is closed".to_string(),
1264            ));
1265        }
1266
1267        // JE invariant: only `Electable` nodes can become master.  `Secondary`,
1268        // `Monitor`, and `Arbiter` are not electable and must be rejected at
1269        // the API layer (mirrors JE `ExceptionTest`).  See
1270        // `NodeType::can_be_master`.
1271        if !self.config.node_type.can_be_master() {
1272            return Err(RepError::InvalidStateTransition(format!(
1273                "node '{}' has type {} which is not electable as master",
1274                self.config.node_name.as_str(),
1275                self.config.node_type,
1276            )));
1277        }
1278
1279        // Ensure we can reach Master state (may need Detached -> Unknown first)
1280        self.ensure_unknown_state()?;
1281
1282        let old_state = self.node_state.get_state();
1283        self.node_state.transition_to(NodeState::Master)?;
1284        self.master_tracker.set_master(self.config.node_name.as_str(), term);
1285
1286        // --- F9: spawn Feeder trackers for each known replica -------------
1287        //
1288        // Closes finding F9 of `docs/src/internal/api-audit-2026-05-rep.md`.
1289        // The architecture is pull-based: replicas pull from the master's
1290        // `PEER_FEEDER` service via `catch_up_from_peer`.  However, the
1291        // master must:
1292        //   1. Track each replica via a `Feeder` so AckTracker bookkeeping
1293        //      can attribute replica acks to the right node.
1294        //   2. Push its own writes into `peer_scanner` so replicas pulling
1295        //      from `PEER_FEEDER` actually receive entries (`replicate_entry`).
1296        //
1297        // Here we ensure step 1: every known electable peer in the group
1298        // gets a `Feeder` entry.
1299        {
1300            let mut feeders = self.feeders.write();
1301            // Drop any stale feeders left over from a prior role.  A
1302            // `Feeder` is just an in-memory tracker; recreating it is
1303            // cheap and avoids state inversion bugs across role changes.
1304            feeders.clear();
1305            for peer in self.group_service.get_all_nodes() {
1306                if peer.name == self.config.node_name {
1307                    continue;
1308                }
1309                if peer.node_type != crate::node_type::NodeType::Electable
1310                    && peer.node_type != crate::node_type::NodeType::Secondary
1311                {
1312                    // Arbiters do not receive log entries.
1313                    continue;
1314                }
1315                feeders.push(Feeder::new(peer.name.clone()));
1316                log::debug!(
1317                    "Node '{}' (master, term={}): registered Feeder for \
1318                     replica '{}'",
1319                    self.config.node_name.as_str(),
1320                    term,
1321                    peer.name,
1322                );
1323            }
1324        }
1325
1326        // For observability, log the count.
1327        log::info!(
1328            "Node '{}' became master for term {} \
1329             (feeder trackers: {} known replicas)",
1330            self.config.node_name.as_str(),
1331            term,
1332            self.feeders.read().len(),
1333        );
1334
1335        // -------------------------------------------------------------------
1336
1337        // Notify listeners
1338        self.notify_listeners(old_state, NodeState::Master);
1339
1340        Ok(())
1341    }
1342
1343    /// Transition to replica state with the given master.
1344    ///
1345    /// Transitions this node to Replica state. The node will receive log
1346    /// entries from the specified master.
1347    ///
1348    /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1349    /// the method prepares an `EnvironmentLogWriter` so that replicated
1350    /// entries can be written to the local log.  The actual network connection
1351    /// is established by the `TcpServiceDispatcher`; this method logs intent.
1352    ///
1353    /// In HA.
1354    pub fn become_replica(&self, master_name: &str) -> Result<()> {
1355        if self.is_shutdown() {
1356            return Err(RepError::StateError(
1357                "Cannot become replica: environment is closed".to_string(),
1358            ));
1359        }
1360
1361        // Ensure we can reach Replica state (may need Detached -> Unknown first)
1362        self.ensure_unknown_state()?;
1363
1364        let old_state = self.node_state.get_state();
1365        self.node_state.transition_to(NodeState::Replica)?;
1366        self.master_tracker.set_master(master_name, 0);
1367        self.replica_stream.set_master(master_name);
1368        self.replica_stream.set_state(
1369            crate::stream::replica_stream::ReplicaStreamState::Connecting,
1370        );
1371
1372        // --- G19: start replica receive loop --------------------------------
1373        //
1374        // Connects to the master's PEER_FEEDER service and runs a
1375        // ReplicaReceiver loop in a background thread.  The receiver writes
1376        // replicated entries via EnvironmentLogWriter.
1377        if let Some(env) = self.env_impl.lock().unwrap().clone() {
1378            if let Some(log_mgr) = env.get_log_manager() {
1379                let vlsn_index =
1380                    Arc::new(crate::vlsn::vlsn_index::VlsnIndex::new(10));
1381
1382                // Resolve the master's socket address from the GroupService.
1383                let master_addr_opt: Option<SocketAddr> = self
1384                    .group_service
1385                    .get_all_nodes()
1386                    .iter()
1387                    .find(|n| n.name == master_name)
1388                    .and_then(|info| {
1389                        format!("{}:{}", info.host, info.port)
1390                            .parse::<SocketAddr>()
1391                            .ok()
1392                    });
1393
1394                let node_name = self.config.node_name.clone();
1395                let master = master_name.to_string();
1396                let vlsn_index_clone = Arc::clone(&vlsn_index);
1397                let shutdown_flag = self.io_shutdown.load(Ordering::SeqCst);
1398                // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
1399                // can call `bootstrap_via_dispatcher` automatically when
1400                // the master signals `NeedsRestore`.  When the env was
1401                // never registered with `init_self_weak` (raw
1402                // `Arc::new(Self::new(...))` without going through
1403                // `open()` or the test harness), the weak ref is `None`
1404                // and we fall back to operator-driven bootstrap.
1405                let self_weak: Option<Weak<Self>> =
1406                    self.self_weak.get().cloned();
1407
1408                let handle = std::thread::Builder::new()
1409                    .name(format!("noxu-replica-{}", node_name))
1410                    .spawn(move || {
1411                        let mut writer = EnvironmentLogWriter::new(
1412                            log_mgr,
1413                            vlsn_index_clone,
1414                        );
1415
1416                        let Some(addr) = master_addr_opt else {
1417                            log::warn!(
1418                                "noxu-replica-{}: master '{}' address not in RepGroup; \
1419                                 waiting for TCP dispatcher connection",
1420                                node_name, master,
1421                            );
1422                            return;
1423                        };
1424
1425                        // Catch-up loop: catch up, observe NeedsRestore,
1426                        // optionally auto-bootstrap, retry once.  We cap
1427                        // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
1428                        // (small) so a misbehaving master does not loop
1429                        // forever consuming network bandwidth.
1430                        const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
1431                        let mut attempts: u32 = 0;
1432                        loop {
1433                            log::info!(
1434                                "noxu-replica-{}: connecting to master '{}' at {}",
1435                                node_name, master, addr,
1436                            );
1437                            match crate::stream::peer_feeder::catch_up_from_peer(
1438                                addr, 0, &mut writer,
1439                            ) {
1440                                Ok(true) => {
1441                                    log::info!(
1442                                        "noxu-replica-{}: catch-up complete from '{}'",
1443                                        node_name, master,
1444                                    );
1445                                    return;
1446                                }
1447                                Ok(false) => {
1448                                    // F2/F4: master signals NeedsRestore.
1449                                    // Wave 9-A fix 2: if a Weak<Self> was
1450                                    // plumbed in, upgrade it and call
1451                                    // `bootstrap_via_dispatcher` ourselves
1452                                    // so the replica auto-bootstraps and
1453                                    // resumes catch-up without operator
1454                                    // intervention.
1455                                    log::warn!(
1456                                        "noxu-replica-{}: master '{}' requires restore",
1457                                        node_name, master,
1458                                    );
1459                                    attempts += 1;
1460                                    if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
1461                                        log::error!(
1462                                            "noxu-replica-{}: exceeded \
1463                                             auto-bootstrap attempts ({}); giving up",
1464                                            node_name,
1465                                            MAX_AUTO_BOOTSTRAP_ATTEMPTS,
1466                                        );
1467                                        return;
1468                                    }
1469                                    let env_arc = match self_weak
1470                                        .as_ref()
1471                                        .and_then(Weak::upgrade)
1472                                    {
1473                                        Some(e) => e,
1474                                        None => {
1475                                            // No back-ref or env dropped:
1476                                            // fall back to operator-driven
1477                                            // bootstrap and exit cleanly.
1478                                            log::warn!(
1479                                                "noxu-replica-{}: no back-reference \
1480                                                 available; operator must call \
1481                                                 bootstrap_via_dispatcher manually",
1482                                                node_name,
1483                                            );
1484                                            return;
1485                                        }
1486                                    };
1487                                    if env_arc.is_shutdown() {
1488                                        return;
1489                                    }
1490                                    log::info!(
1491                                        "noxu-replica-{}: auto-bootstrapping via \
1492                                         dispatcher from '{}' (attempt {})",
1493                                        node_name, master, attempts,
1494                                    );
1495                                    match env_arc
1496                                        .bootstrap_via_dispatcher(&master)
1497                                    {
1498                                        Ok(()) => {
1499                                            log::info!(
1500                                                "noxu-replica-{}: auto-bootstrap \
1501                                                 succeeded; resuming catch-up",
1502                                                node_name,
1503                                            );
1504                                            // Drop the strong ref before
1505                                            // re-entering catch-up so we
1506                                            // do not keep the env alive
1507                                            // longer than necessary.
1508                                            drop(env_arc);
1509                                            continue;
1510                                        }
1511                                        Err(e) => {
1512                                            log::error!(
1513                                                "noxu-replica-{}: auto-bootstrap \
1514                                                 failed: {}",
1515                                                node_name, e,
1516                                            );
1517                                            return;
1518                                        }
1519                                    }
1520                                }
1521                                Err(e) => {
1522                                    if !shutdown_flag {
1523                                        log::error!(
1524                                            "noxu-replica-{}: error from master '{}': {e}",
1525                                            node_name, master,
1526                                        );
1527                                    }
1528                                    return;
1529                                }
1530                            }
1531                        }
1532                    })
1533                    .expect("failed to spawn noxu-replica thread");
1534
1535                self.io_threads.lock().unwrap().push(handle);
1536
1537                log::debug!(
1538                    "Node '{}': replica receive thread started for master '{}'",
1539                    self.config.node_name.as_str(),
1540                    master_name,
1541                );
1542            } else {
1543                log::warn!(
1544                    "Node '{}': no LogManager available (read-only env?); \
1545                     replica I/O loop not started",
1546                    self.config.node_name.as_str(),
1547                );
1548            }
1549        }
1550        // -------------------------------------------------------------------
1551
1552        // Notify listeners
1553        self.notify_listeners(old_state, NodeState::Replica);
1554
1555        log::info!(
1556            "Node '{}' became replica of master '{}'",
1557            self.config.node_name.as_str(),
1558            master_name
1559        );
1560        Ok(())
1561    }
1562
1563    /// Initiate a master transfer to the target node.
1564    ///
1565    ///
1566    ///
1567    /// Transfers the current master state from this node to one of the
1568    /// electable replicas. The replica that is actually chosen to be the new
1569    /// master is the one with which the Master Transfer can be completed most
1570    /// rapidly. The transfer operation ensures that all changes at this node
1571    /// are available at the new master upon conclusion of the operation.
1572    pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
1573        if self.is_shutdown() {
1574            return Err(RepError::StateError(
1575                "Cannot transfer master: environment is closed".to_string(),
1576            ));
1577        }
1578
1579        if !self.is_master() {
1580            return Err(RepError::InvalidState(
1581                "Master transfer can only be initiated on the master node"
1582                    .to_string(),
1583            ));
1584        }
1585
1586        log::info!(
1587            "Node '{}' initiating master transfer to '{}'",
1588            self.config.node_name.as_str(),
1589            config.target_node,
1590        );
1591
1592        // Closes finding F7 of `docs/src/internal/api-audit-2026-05-rep.md`.
1593        //
1594        // Steps:
1595        //   1. Locate the target's address.
1596        //   2. Compute the new term (current observed term + 1).
1597        //   3. Send TRANSFER_MASTER to the target — it will become master.
1598        //   4. Send TRANSFER_MASTER (with the same term + new master name) to
1599        //      every other peer so they re-target.
1600        //   5. Demote self to Replica of the target.
1601        //
1602        // The transfer is best-effort: a peer that doesn't ack is logged
1603        // and skipped.  The election driver will reconcile any divergence
1604        // on the next election round.
1605
1606        let target_addr = self
1607            .group_service
1608            .get_all_nodes()
1609            .into_iter()
1610            .find(|n| n.name == config.target_node)
1611            .and_then(|n| {
1612                format!("{}:{}", n.host, n.port)
1613                    .parse::<std::net::SocketAddr>()
1614                    .ok()
1615            })
1616            .ok_or_else(|| {
1617                RepError::ConfigError(format!(
1618                    "transfer_master: target '{}' not registered or has bad address",
1619                    config.target_node
1620                ))
1621            })?;
1622
1623        let new_term = self.master_tracker.get_term().saturating_add(1);
1624
1625        // 1. Tell the target to become master at the new term.
1626        let target_ack = crate::group_admin::send_transfer_master(
1627            target_addr,
1628            &config.target_node,
1629            new_term,
1630        )
1631        .map_err(|e| {
1632            RepError::NetworkError(format!(
1633                "transfer_master: failed to signal target '{}': {}",
1634                config.target_node, e
1635            ))
1636        })?;
1637        if !target_ack {
1638            return Err(RepError::StateError(format!(
1639                "transfer_master: target '{}' rejected the transfer",
1640                config.target_node
1641            )));
1642        }
1643
1644        // 2. Inform all other peers (best-effort).
1645        for peer in self.group_service.get_all_nodes() {
1646            if peer.name == self.config.node_name
1647                || peer.name == config.target_node
1648            {
1649                continue;
1650            }
1651            if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
1652                let _ = crate::group_admin::send_transfer_master(
1653                    addr,
1654                    &config.target_node,
1655                    new_term,
1656                );
1657            }
1658        }
1659
1660        // 3. Demote self to Replica of the new master.
1661        self.become_replica(&config.target_node)?;
1662
1663        log::info!(
1664            "Node '{}' transferred master to '{}' at term {}",
1665            self.config.node_name.as_str(),
1666            config.target_node,
1667            new_term,
1668        );
1669        Ok(())
1670    }
1671
1672    /// Register a VLSN (as master, after writing a log entry).
1673    ///
1674    /// Maps the given VLSN to the specified log file position. This is called
1675    /// by the master after it writes a replicated log entry.
1676    pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
1677        self.vlsn_index.register(vlsn, file_number, file_offset);
1678    }
1679
1680    /// Replicate a freshly committed log entry from the master.
1681    ///
1682    /// Closes finding F9 of `docs/src/internal/api-audit-2026-05-rep.md`.
1683    ///
1684    /// Combines `register_vlsn` with a push into the in-memory
1685    /// `peer_scanner` so that downstream replicas pulling from this
1686    /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
1687    /// stream the entry without round-tripping through the on-disk
1688    /// log.  The local log is still the source of truth; the peer
1689    /// scanner is a fast-path cache that bounds itself via
1690    /// `PeerLogScanner::with_capacity` so old entries are evicted.
1691    ///
1692    /// Should be called by the master after the local commit has
1693    /// fsynced.  Calling on a non-master is harmless (the peer
1694    /// scanner cache is also used by replicas) but is logged at trace
1695    /// level for diagnostics.
1696    pub fn replicate_entry(
1697        &self,
1698        vlsn: u64,
1699        file_number: u32,
1700        file_offset: u32,
1701        entry_type: u8,
1702        data: Vec<u8>,
1703    ) {
1704        self.vlsn_index.register(vlsn, file_number, file_offset);
1705        self.peer_scanner.push(vlsn, entry_type, data);
1706        if !self.is_master() {
1707            log::trace!(
1708                "replicate_entry called on non-master node '{}': vlsn={}, type={}",
1709                self.config.node_name,
1710                vlsn,
1711                entry_type,
1712            );
1713        }
1714    }
1715
1716    /// Apply a replicated entry (as replica).
1717    ///
1718    /// Applies a log entry received from the master. This is called by the
1719    /// replica stream handler after receiving an entry from the feeder.
1720    ///
1721    /// `data` is the wire-encoded log-record payload.  When the
1722    /// replicated environment has not been wired to a local
1723    /// `noxu_db::Environment` (i.e., before `with_environment` is
1724    /// called) the payload is forwarded into the in-memory peer
1725    /// scanner so that downstream replicas attached to the
1726    /// `PEER_FEEDER` service can re-stream it; the local log is **not**
1727    /// updated.  This is documented behaviour rather than a stub — see
1728    /// `api-audit-2026-05-rep.md` finding #26 (medium) for the
1729    /// `with_environment`-required local-apply path.
1730    /// cleanup (rep info F35: `_data` placeholder) renames the leading
1731    /// underscore so reviewers don't read it as a TODO.
1732    pub fn apply_entry(
1733        &self,
1734        vlsn: u64,
1735        entry_type: u8,
1736        data: Vec<u8>,
1737    ) -> Result<()> {
1738        if self.is_shutdown() {
1739            return Err(RepError::StateError(
1740                "Cannot apply entry: environment is closed".to_string(),
1741            ));
1742        }
1743
1744        // Register the VLSN in the index.
1745        self.vlsn_index.register(vlsn, 0, 0);
1746
1747        // Push into the peer log scanner so downstream replicas can
1748        // receive this entry via the PEER_FEEDER service.
1749        self.peer_scanner.push(vlsn, entry_type, data);
1750
1751        log::trace!(
1752            "Applied replicated entry: vlsn={}, type={}",
1753            vlsn,
1754            entry_type
1755        );
1756        Ok(())
1757    }
1758
1759    /// Record an ack from a replica (as master).
1760    ///
1761    /// Records that the specified replica has acknowledged processing up to
1762    /// the given VLSN. This is used by the master to track durability
1763    /// guarantees.
1764    pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
1765        self.ack_tracker.record_ack(vlsn, replica_name);
1766    }
1767
1768    /// Set the state change listener.
1769    ///
1770    ///
1771    ///
1772    /// Sets the listener used to receive asynchronous replication node state
1773    /// change events. Note that there is one listener per replication node,
1774    /// not one per handle. Invoking this method adds to the set of listeners.
1775    ///
1776    /// Invoking this method typically results in an immediate callback to the
1777    /// application via the `on_state_change` method, so that the application
1778    /// is made aware of the existing state of the node at the time the listener
1779    /// is first established.
1780    pub fn set_state_change_listener(
1781        &self,
1782        listener: Arc<dyn StateChangeListener>,
1783    ) {
1784        // Immediately notify the listener of the current state
1785        let current_state = self.node_state.get_state();
1786        let event = StateChangeEvent::new(
1787            current_state,
1788            current_state,
1789            self.get_master_name(),
1790        );
1791        listener.on_state_change(event);
1792
1793        let mut listeners = self.listeners.write();
1794        listeners.push(listener);
1795    }
1796
1797    /// Close the replicated environment.
1798    ///
1799    ///
1800    ///
1801    /// Closes this handle and releases any resources. When closed, daemon
1802    /// threads are stopped, even if they are performing work. The node ceases
1803    /// participation in the replication group. If the node was currently the
1804    /// master, the rest of the group will hold an election.
1805    ///
1806    /// The ReplicatedEnvironment should not be closed while any other type of
1807    /// handle that refers to it is not yet closed.
1808    pub fn close(&self) -> Result<()> {
1809        if self.shutdown.swap(true, Ordering::SeqCst) {
1810            // Already closed
1811            return Ok(());
1812        }
1813
1814        let old_state = self.node_state.get_state();
1815
1816        // Transition to Shutdown state. The state machine allows this from
1817        // any non-Shutdown state.
1818        let _ = self.node_state.transition_to(NodeState::Shutdown);
1819
1820        // Notify listeners of the shutdown
1821        self.notify_listeners(old_state, NodeState::Shutdown);
1822
1823        // Clear feeders
1824        {
1825            let mut feeders = self.feeders.write();
1826            feeders.clear();
1827        }
1828
1829        // Signal and join all I/O threads spawned by become_master /
1830        // become_replica / start_vlsn_persistence_daemon.  The vlsn-flush
1831        // thread does a final flush on its way out so a clean close is
1832        // recoverable.  Closes finding F11.
1833        self.io_shutdown.store(true, Ordering::SeqCst);
1834        {
1835            let mut threads = self.io_threads.lock().unwrap();
1836            for handle in threads.drain(..) {
1837                let _ = handle.join();
1838            }
1839        }
1840
1841        // Belt-and-braces: even when no daemon is running (e.g.
1842        // `ReplicatedEnvironment::new` without `open`), persist a final
1843        // snapshot if env_home is configured.
1844        if let Some(ref home) = self.config.env_home
1845            && let Err(e) =
1846                crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
1847        {
1848            log::warn!(
1849                "close: failed to persist VLSN index to {}: {}",
1850                home.display(),
1851                e
1852            );
1853        }
1854
1855        // Stop the TCP service dispatcher (the: serviceDispatcher.shutdown()).
1856        if let Some(ref dispatcher) = self.tcp_dispatcher {
1857            dispatcher.stop();
1858            log::debug!(
1859                "Node '{}' TCP service dispatcher stopped",
1860                self.config.node_name.as_str()
1861            );
1862        }
1863
1864        log::info!(
1865            "Replicated environment '{}' in group '{}' closed",
1866            self.config.node_name.as_str(),
1867            self.config.group_name.as_str()
1868        );
1869
1870        Ok(())
1871    }
1872
1873    /// Close this handle and shut down the Replication Group by forcing all
1874    /// active Replicas to exit.
1875    ///
1876    ///
1877    ///
1878    /// This method must be invoked on the node that's currently the Master
1879    /// after all other outstanding handles have been closed. The Master waits
1880    /// for all active Replicas to catch up so that they have a current set of
1881    /// logs, and then shuts them down.
1882    pub fn shutdown_group(
1883        &self,
1884        replica_shutdown_timeout_ms: u64,
1885    ) -> Result<()> {
1886        if !self.is_master() {
1887            return Err(RepError::InvalidState(
1888                "shutdownGroup must be invoked on the master".to_string(),
1889            ));
1890        }
1891
1892        log::info!(
1893            "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
1894            self.config.node_name.as_str(),
1895            self.config.group_name.as_str(),
1896            replica_shutdown_timeout_ms,
1897        );
1898
1899        // Closes finding F8 of `docs/src/internal/api-audit-2026-05-rep.md`.
1900        //
1901        // Send SHUTDOWN_GROUP to every known peer.  The recipient calls
1902        // its own `close()` and the per-connection ADMIN handler
1903        // returns ACK_OK.  Any peer that doesn't ack within the
1904        // timeout is logged and the master proceeds.  After signalling
1905        // every peer, the master closes its own env.
1906        let deadline = std::time::Instant::now()
1907            + Duration::from_millis(replica_shutdown_timeout_ms);
1908
1909        for peer in self.group_service.get_all_nodes() {
1910            if peer.name == self.config.node_name {
1911                continue;
1912            }
1913            // Don't exceed the deadline waiting for any single peer.
1914            let now = std::time::Instant::now();
1915            if now >= deadline {
1916                log::warn!(
1917                    "shutdown_group: deadline reached; skipping remaining peers"
1918                );
1919                break;
1920            }
1921            let addr_str = format!("{}:{}", peer.host, peer.port);
1922            let addr = match addr_str.parse::<SocketAddr>() {
1923                Ok(a) => a,
1924                Err(e) => {
1925                    log::warn!(
1926                        "shutdown_group: peer '{}' has bad address {}: {}",
1927                        peer.name,
1928                        addr_str,
1929                        e
1930                    );
1931                    continue;
1932                }
1933            };
1934            match crate::group_admin::send_shutdown_group(addr) {
1935                Ok(true) => log::info!(
1936                    "shutdown_group: peer '{}' acknowledged",
1937                    peer.name
1938                ),
1939                Ok(false) => log::warn!(
1940                    "shutdown_group: peer '{}' rejected the request",
1941                    peer.name
1942                ),
1943                Err(e) => log::warn!(
1944                    "shutdown_group: peer '{}' unreachable: {}",
1945                    peer.name,
1946                    e
1947                ),
1948            }
1949        }
1950
1951        // Master closes itself last.
1952        self.close()
1953    }
1954
1955    /// Check if shutdown is in progress.
1956    pub fn is_shutdown(&self) -> bool {
1957        self.shutdown.load(Ordering::SeqCst)
1958    }
1959
1960    /// Notify all registered listeners of a state change.
1961    fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
1962        let listeners = self.listeners.read();
1963        if !listeners.is_empty() {
1964            let event = StateChangeEvent::new(
1965                old_state,
1966                new_state,
1967                self.get_master_name(),
1968            );
1969            for listener in listeners.iter() {
1970                listener.on_state_change(event.clone());
1971            }
1972        }
1973    }
1974}
1975
1976// ---------------------------------------------------------------------------
1977// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
1978// ---------------------------------------------------------------------------
1979//
1980// `noxu_db::Transaction::commit_with_durability` calls
1981// `await_replica_acks` after the local WAL fsync.  This impl:
1982//
1983//   1. Rejects calls on a non-master node with `NotMaster`.
1984//   2. Rejects calls during shutdown with `Shutdown`.
1985//   3. Computes the required ack count from `electable_count` and the
1986//      requested policy.
1987//   4. Allocates a unique commit sequence number, registers the ack
1988//      requirement on the `AckTracker`, and polls `is_satisfied` with
1989//      a small sleep until either the timeout elapses or the policy
1990//      is satisfied.
1991//   5. Cleans up the tracker entry on every exit path.
1992//
1993// Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
1994impl ReplicaAckCoordinator for ReplicatedEnvironment {
1995    fn await_replica_acks(
1996        &self,
1997        policy: ReplicaAckPolicyKind,
1998        timeout: Duration,
1999    ) -> std::result::Result<u32, AckWaitError> {
2000        // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
2001        // says callers may already short-circuit, but be defensive.
2002        if matches!(policy, ReplicaAckPolicyKind::None) {
2003            return Ok(0);
2004        }
2005
2006        if self.is_shutdown() {
2007            return Err(AckWaitError {
2008                kind: AckWaitErrorKind::Shutdown,
2009                needed: 0,
2010                received: 0,
2011            });
2012        }
2013
2014        if !self.is_master() {
2015            return Err(AckWaitError {
2016                kind: AckWaitErrorKind::NotMaster,
2017                needed: 0,
2018                received: 0,
2019            });
2020        }
2021
2022        // Count electable peers (excluding the master) using the
2023        // RepGroup view, which counts Arbiters and Electables
2024        // identically. Only Electable nodes are counted as data
2025        // replicas able to ack a commit.  The master itself is
2026        // *implicit*: it is not registered in `group_service` (only
2027        // peers are), so we add 1 to obtain the total electable
2028        // count expected by `ReplicaAckPolicyKind::required_acks`.
2029        let group = self.get_rep_group();
2030        let electable_peers: u32 = group
2031            .get_nodes()
2032            .iter()
2033            .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2034            .count() as u32;
2035        let electable_count: u32 = electable_peers + 1; // +1 for self/master
2036
2037        let needed = policy.required_acks(electable_count);
2038        if needed == 0 {
2039            // Single-node group, or All with only the master itself.
2040            return Ok(0);
2041        }
2042
2043        let commit_seq = self
2044            .commit_ack_seq
2045            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2046        self.ack_tracker.register(commit_seq, needed);
2047
2048        // Poll-with-sleep loop. The poll interval is small enough that
2049        // late acks satisfy the policy promptly, and large enough that
2050        // a single commit waiting on a slow replica does not spin a
2051        // CPU.
2052        let poll_interval =
2053            std::cmp::min(timeout / 50, Duration::from_millis(20));
2054        let poll_interval = if poll_interval.is_zero() {
2055            Duration::from_millis(1)
2056        } else {
2057            poll_interval
2058        };
2059        let deadline = std::time::Instant::now() + timeout;
2060
2061        loop {
2062            if self.ack_tracker.is_satisfied(commit_seq) {
2063                self.ack_tracker.cleanup_through(commit_seq);
2064                return Ok(needed);
2065            }
2066            if self.is_shutdown() {
2067                self.ack_tracker.cleanup_through(commit_seq);
2068                return Err(AckWaitError {
2069                    kind: AckWaitErrorKind::Shutdown,
2070                    needed,
2071                    received: 0,
2072                });
2073            }
2074            let now = std::time::Instant::now();
2075            if now >= deadline {
2076                // Tear down the registration so it doesn't accumulate;
2077                // record the partial ack count so the caller can report
2078                // a useful `InsufficientReplicas { required, available }`.
2079                let received =
2080                    self.ack_tracker.received_count(commit_seq).unwrap_or(0);
2081                self.ack_tracker.cleanup_through(commit_seq);
2082                return Err(AckWaitError {
2083                    kind: AckWaitErrorKind::Timeout,
2084                    needed,
2085                    received,
2086                });
2087            }
2088            let sleep_for = std::cmp::min(
2089                poll_interval,
2090                deadline.saturating_duration_since(now),
2091            );
2092            std::thread::sleep(sleep_for);
2093        }
2094    }
2095
2096    /// X-3: allocate the next VLSN for a recovered XA commit and register
2097    /// `lsn` in the VLSN index so feeders can stream the commit.
2098    ///
2099    /// Increments off the current latest VLSN so the new VLSN is strictly
2100    /// monotonically increasing.  In a single-node or master-less environment
2101    /// (not master) returns 0 (NULL_VLSN — harmless, the default).
2102    fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
2103        // Only allocate a VLSN when we are the master; on a replica the
2104        // recovered XA should have been replicated by the original master.
2105        if !self.is_master() {
2106            return 0;
2107        }
2108        let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
2109        self.vlsn_index.register(
2110            next_vlsn,
2111            lsn.file_number(),
2112            lsn.file_offset(),
2113        );
2114        log::debug!(
2115            "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
2116            next_vlsn,
2117            lsn
2118        );
2119        next_vlsn
2120    }
2121}
2122
2123#[cfg(test)]
2124mod tests {
2125    use super::*;
2126    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
2127
2128    /// Helper to create a test config with a fixed port (unit-test style,
2129    /// no real TCP bind needed — hostname "localhost" resolves but the port
2130    /// might be in use; use `test_config_port0` for real TCP tests).
2131    fn test_config(node_name: &str) -> RepConfig {
2132        RepConfig::builder("test_group", node_name, "localhost")
2133            .node_port(5001)
2134            .build()
2135    }
2136
2137    /// Helper to create a test config that binds to an OS-assigned port.
2138    fn test_config_port0(node_name: &str) -> RepConfig {
2139        RepConfig::builder("test_group", node_name, "127.0.0.1")
2140            .node_port(0)
2141            .build()
2142    }
2143
2144    #[test]
2145    fn test_initial_state_is_detached() {
2146        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2147        // NodeStateMachine starts in Detached state
2148        assert_eq!(env.get_state(), NodeState::Detached);
2149        assert!(!env.is_master());
2150        assert!(!env.is_replica());
2151        assert!(!env.is_active());
2152    }
2153
2154    #[test]
2155    fn test_become_master() {
2156        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2157        env.become_master(1).unwrap();
2158        assert_eq!(env.get_state(), NodeState::Master);
2159        assert!(env.is_master());
2160        assert!(!env.is_replica());
2161        assert!(env.is_active());
2162    }
2163
2164    #[test]
2165    fn test_become_replica() {
2166        let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
2167        env.become_replica("node1").unwrap();
2168        assert_eq!(env.get_state(), NodeState::Replica);
2169        assert!(!env.is_master());
2170        assert!(env.is_replica());
2171        assert!(env.is_active());
2172    }
2173
2174    #[test]
2175    fn test_get_node_name() {
2176        let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
2177        assert_eq!(env.get_node_name(), "my_node");
2178    }
2179
2180    #[test]
2181    fn test_get_group_name() {
2182        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2183        assert_eq!(env.get_group_name(), "test_group");
2184    }
2185
2186    #[test]
2187    fn test_register_vlsn_updates_index() {
2188        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2189        env.register_vlsn(1, 0, 100);
2190        env.register_vlsn(2, 0, 200);
2191        env.register_vlsn(3, 0, 300);
2192
2193        assert_eq!(env.get_current_vlsn(), 3);
2194        let range = env.get_vlsn_range();
2195        assert_eq!(range.first(), 1);
2196        assert_eq!(range.last(), 3);
2197    }
2198
2199    #[test]
2200    fn test_record_ack() {
2201        let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
2202        env.become_master(1).unwrap();
2203
2204        env.register_vlsn(1, 0, 100);
2205        // Register a pending ack requirement, then record ack
2206        env.get_ack_tracker().register(1, 1);
2207        env.record_ack(1, "replica1");
2208        // Ack should be satisfied
2209        assert!(env.get_ack_tracker().is_satisfied(1));
2210    }
2211
2212    #[test]
2213    fn test_close_sets_shutdown() {
2214        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2215        assert!(!env.is_shutdown());
2216
2217        env.close().unwrap();
2218        assert!(env.is_shutdown());
2219        // After close, state should be Shutdown
2220        assert_eq!(env.get_state(), NodeState::Shutdown);
2221    }
2222
2223    #[test]
2224    fn test_close_is_idempotent() {
2225        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2226        env.close().unwrap();
2227        env.close().unwrap(); // Should not error
2228        assert!(env.is_shutdown());
2229    }
2230
2231    #[test]
2232    fn test_cannot_become_master_when_shutdown() {
2233        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2234        env.close().unwrap();
2235
2236        let result = env.become_master(1);
2237        assert!(result.is_err());
2238    }
2239
2240    #[test]
2241    fn test_cannot_become_replica_when_shutdown() {
2242        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2243        env.close().unwrap();
2244
2245        let result = env.become_replica("master");
2246        assert!(result.is_err());
2247    }
2248
2249    #[test]
2250    fn test_cannot_apply_entry_when_shutdown() {
2251        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2252        env.close().unwrap();
2253
2254        let result = env.apply_entry(1, 0, vec![1, 2, 3]);
2255        assert!(result.is_err());
2256    }
2257
2258    #[test]
2259    fn test_cannot_transfer_master_when_not_master() {
2260        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2261        env.become_replica("other").unwrap();
2262
2263        let config = MasterTransferConfig::new(
2264            "target_node".to_string(),
2265            Duration::from_secs(30),
2266        );
2267        let result = env.transfer_master(config);
2268        assert!(result.is_err());
2269    }
2270
2271    #[test]
2272    fn test_transfer_master_requires_registered_target() {
2273        // F7: transfer_master is no longer a no-op; it sends an ADMIN
2274        // TRANSFER_MASTER signal to the target via TCP.  An unregistered
2275        // target is rejected at the address-resolution step.
2276        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2277        env.become_master(1).unwrap();
2278
2279        let config = MasterTransferConfig::new(
2280            "unknown_target".to_string(),
2281            Duration::from_secs(30),
2282        );
2283        let result = env.transfer_master(config);
2284        assert!(
2285            result.is_err(),
2286            "transfer_master to unregistered target must error"
2287        );
2288    }
2289
2290    #[test]
2291    fn test_apply_entry_registers_vlsn() {
2292        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2293        env.become_replica("master").unwrap();
2294
2295        env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
2296        env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
2297
2298        assert_eq!(env.get_current_vlsn(), 2);
2299    }
2300
2301    #[test]
2302    fn test_master_name_tracking() {
2303        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2304
2305        // Initially no master known
2306        assert!(env.get_master_name().is_none());
2307
2308        // After becoming master, this node is the master
2309        env.become_master(1).unwrap();
2310        assert_eq!(env.get_master_name(), Some("node1".to_string()));
2311    }
2312
2313    #[test]
2314    fn test_master_to_replica_transition() {
2315        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2316
2317        // Become master first
2318        env.become_master(1).unwrap();
2319        assert_eq!(env.get_master_name(), Some("node1".to_string()));
2320
2321        // Transition to replica (Master -> Replica is valid)
2322        env.become_replica("other_master").unwrap();
2323        assert_eq!(env.get_master_name(), Some("other_master".to_string()));
2324        assert!(env.is_replica());
2325    }
2326
2327    #[test]
2328    fn test_state_change_listener_notification() {
2329        struct TestListener {
2330            call_count: AtomicU32,
2331            last_new_state: noxu_sync::Mutex<Option<NodeState>>,
2332        }
2333
2334        impl StateChangeListener for TestListener {
2335            fn on_state_change(&self, event: StateChangeEvent) {
2336                self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
2337                *self.last_new_state.lock() = Some(event.new_state);
2338            }
2339        }
2340
2341        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2342        let listener = Arc::new(TestListener {
2343            call_count: AtomicU32::new(0),
2344            last_new_state: noxu_sync::Mutex::new(None),
2345        });
2346
2347        // Setting the listener should trigger an immediate notification
2348        env.set_state_change_listener(listener.clone());
2349        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
2350
2351        // State change should trigger another notification
2352        env.become_master(1).unwrap();
2353        assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
2354        assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
2355    }
2356
2357    #[test]
2358    fn test_close_notifies_listeners() {
2359        struct ShutdownListener {
2360            shutdown_seen: AtomicBool,
2361        }
2362
2363        impl StateChangeListener for ShutdownListener {
2364            fn on_state_change(&self, event: StateChangeEvent) {
2365                if event.new_state == NodeState::Shutdown {
2366                    self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
2367                }
2368            }
2369        }
2370
2371        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2372        let listener = Arc::new(ShutdownListener {
2373            shutdown_seen: AtomicBool::new(false),
2374        });
2375
2376        // The initial notification is for the current (Detached) state
2377        env.set_state_change_listener(listener.clone());
2378
2379        // Become master first so the close transition is meaningful
2380        env.become_master(1).unwrap();
2381        assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
2382
2383        env.close().unwrap();
2384        assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
2385    }
2386
2387    #[test]
2388    fn test_shutdown_group_requires_master() {
2389        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2390        env.become_replica("other").unwrap();
2391
2392        let result = env.shutdown_group(5000);
2393        assert!(result.is_err());
2394    }
2395
2396    #[test]
2397    fn test_shutdown_group_as_master() {
2398        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2399        env.become_master(1).unwrap();
2400
2401        let result = env.shutdown_group(5000);
2402        assert!(result.is_ok());
2403        assert!(env.is_shutdown());
2404    }
2405
2406    #[test]
2407    fn test_get_config() {
2408        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2409        assert_eq!(env.get_config().node_name, "node1");
2410        assert_eq!(env.get_config().group_name, "test_group");
2411    }
2412
2413    #[test]
2414    fn test_get_stats() {
2415        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2416        let _stats = env.get_stats();
2417        // Just verify we can access stats without panicking
2418    }
2419
2420    // -----------------------------------------------------------------------
2421    // TCP dispatcher tests (H-5 / H-7)
2422    // -----------------------------------------------------------------------
2423
2424    #[test]
2425    fn test_tcp_dispatcher_starts_on_new() {
2426        // Use port 0 so the OS assigns an ephemeral port.
2427        let env =
2428            ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
2429        // The dispatcher must have started and bound a real port.
2430        let addr = env.bound_addr();
2431        assert!(addr.is_some(), "expected a bound address");
2432        let addr = addr.unwrap();
2433        assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
2434    }
2435
2436    #[test]
2437    fn test_tcp_dispatcher_stops_on_close() {
2438        let env =
2439            ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
2440        // Dispatcher is running.
2441        assert!(
2442            env.tcp_dispatcher
2443                .as_ref()
2444                .map(|d| d.is_running())
2445                .unwrap_or(false)
2446        );
2447
2448        env.close().unwrap();
2449
2450        // After close, dispatcher must be stopped.
2451        assert!(
2452            !env.tcp_dispatcher
2453                .as_ref()
2454                .map(|d| d.is_running())
2455                .unwrap_or(false),
2456            "dispatcher should be stopped after close"
2457        );
2458    }
2459
2460    #[test]
2461    fn test_tcp_dispatcher_accepts_connection() {
2462        use crate::net::Channel;
2463        use crate::net::ServiceHandler;
2464        use crate::net::service_dispatcher::connect_to_service;
2465        use std::sync::atomic::{AtomicU32, Ordering as AO};
2466        use std::time::Duration;
2467
2468        struct PingHandler {
2469            count: AtomicU32,
2470        }
2471        impl ServiceHandler for PingHandler {
2472            fn service_name(&self) -> &str {
2473                "ping"
2474            }
2475            fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
2476                self.count.fetch_add(1, AO::SeqCst);
2477                // Echo the first message back.
2478                if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
2479                    let _ = ch.send(&msg);
2480                }
2481                Ok(())
2482            }
2483        }
2484
2485        let env =
2486            ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
2487        let addr = env.bound_addr().expect("dispatcher must be bound");
2488
2489        // Register a ping handler on the running dispatcher.
2490        if let Some(ref disp) = env.tcp_dispatcher {
2491            let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
2492            disp.register("ping", handler.clone());
2493
2494            // Give the accept thread a moment.
2495            std::thread::sleep(Duration::from_millis(20));
2496
2497            let client = connect_to_service(addr, "ping").unwrap();
2498            client.send(b"hello").unwrap();
2499            let reply = client.receive(Duration::from_secs(2)).unwrap();
2500            assert_eq!(reply, Some(b"hello".to_vec()));
2501
2502            assert_eq!(handler.count.load(AO::SeqCst), 1);
2503        }
2504
2505        env.close().unwrap();
2506    }
2507
2508    #[test]
2509    fn test_become_master_auto_transitions_from_detached() {
2510        // The state machine requires Detached -> Unknown -> Master.
2511        // become_master() should handle this automatically.
2512        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2513        assert_eq!(env.get_state(), NodeState::Detached);
2514        env.become_master(1).unwrap();
2515        assert_eq!(env.get_state(), NodeState::Master);
2516    }
2517
2518    #[test]
2519    fn test_become_replica_auto_transitions_from_detached() {
2520        // The state machine requires Detached -> Unknown -> Replica.
2521        // become_replica() should handle this automatically.
2522        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2523        assert_eq!(env.get_state(), NodeState::Detached);
2524        env.become_replica("master_node").unwrap();
2525        assert_eq!(env.get_state(), NodeState::Replica);
2526    }
2527
2528    #[test]
2529    fn test_cannot_transfer_master_when_shutdown() {
2530        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2531        env.become_master(1).unwrap();
2532        env.close().unwrap();
2533
2534        let config = MasterTransferConfig::new(
2535            "target".to_string(),
2536            Duration::from_secs(30),
2537        );
2538        let result = env.transfer_master(config);
2539        assert!(result.is_err());
2540    }
2541
2542    #[test]
2543    fn test_full_lifecycle() {
2544        let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2545
2546        // Start as detached
2547        assert_eq!(env.get_state(), NodeState::Detached);
2548
2549        // Become master
2550        env.become_master(1).unwrap();
2551        assert!(env.is_master());
2552
2553        // Register some VLSNs
2554        env.register_vlsn(1, 0, 100);
2555        env.register_vlsn(2, 0, 200);
2556
2557        // Record ack from replica
2558        env.record_ack(1, "replica1");
2559        env.record_ack(2, "replica1");
2560
2561        // Transition to replica (simulating failover)
2562        env.become_replica("node2").unwrap();
2563        assert!(env.is_replica());
2564
2565        // Apply entries from new master
2566        env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
2567
2568        // Close
2569        env.close().unwrap();
2570        assert!(env.is_shutdown());
2571    }
2572
2573    /// Verify that `with_environment` lazily registers the RESTORE service on
2574    /// the TCP dispatcher when `config.env_home` was not set at construction.
2575    ///
2576    /// This mirrors`RepNode.envSetup()` which registers the restore handler
2577    /// when the environment is wired into the replicated node.
2578    #[test]
2579    fn test_restore_registered_lazily_via_with_environment() {
2580        use noxu_dbi::EnvironmentImpl;
2581        use tempfile::TempDir;
2582
2583        let dir = TempDir::new().expect("temp dir");
2584
2585        // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
2586        let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
2587            .node_port(0)
2588            .build();
2589
2590        let rep_env = ReplicatedEnvironment::new(config).unwrap();
2591
2592        // Not yet registered.
2593        assert!(
2594            !rep_env
2595                .restore_registered
2596                .load(std::sync::atomic::Ordering::SeqCst)
2597        );
2598
2599        // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
2600        let env_impl = Arc::new(
2601            EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
2602        );
2603        rep_env.with_environment(env_impl);
2604
2605        // Now the RESTORE service must be registered.
2606        assert!(
2607            rep_env
2608                .restore_registered
2609                .load(std::sync::atomic::Ordering::SeqCst)
2610        );
2611    }
2612
2613    /// Verify that when `config.env_home` IS set at construction, the RESTORE
2614    /// service is registered immediately (not deferred).
2615    #[test]
2616    fn test_restore_registered_eagerly_when_env_home_in_config() {
2617        use tempfile::TempDir;
2618
2619        let dir = TempDir::new().expect("temp dir");
2620
2621        let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
2622            .node_port(0)
2623            .env_home(dir.path())
2624            .build();
2625
2626        let rep_env = ReplicatedEnvironment::new(config).unwrap();
2627
2628        // Should be registered immediately (env_home was in config).
2629        assert!(
2630            rep_env
2631                .restore_registered
2632                .load(std::sync::atomic::Ordering::SeqCst)
2633        );
2634    }
2635}