noxu_rep/replicated_environment.rs
1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29 AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30 ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43 ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::{
50 AnyServiceDispatcher, TcpServiceDispatcher,
51};
52use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
53use crate::network_restore_server::{
54 NetworkRestoreServer, RESTORE_SERVICE_NAME,
55};
56use crate::node_state::{NodeState, NodeStateMachine};
57use crate::rep_config::RepConfig;
58use crate::rep_stats::RepStats;
59use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
60use crate::stream::feeder::EnvironmentLogScanner;
61use crate::stream::feeder::Feeder;
62use crate::stream::feeder::FeederRunner;
63use crate::stream::peer_feeder::PeerScannerAdapter;
64use crate::stream::peer_feeder::{
65 PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
66};
67use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
68use crate::vlsn::vlsn_index::VlsnIndex;
69use crate::vlsn::vlsn_range::VlsnRange;
70use std::collections::HashMap;
71
72/// Default heartbeat timeout for master liveness detection.
73const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
74
75/// A replicated database environment.
76///
77///
78///
79/// This is the entry point for replication. It wraps a standard Environment
80/// and adds replication capabilities including master election, replica
81/// streaming, and commit acknowledgments.
82///
83/// High Availability (HA) provides a replicated, embedded database
84/// management system which provides fast, reliable, and scalable data
85/// management. HA enables replication of an environment across a Replication
86/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
87///
88/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
89/// operations are executed in the same fashion in both replicated and
90/// non-replicated applications. A `ReplicatedEnvironment` must be
91/// transactional. All replicated databases created in the replicated
92/// environment must be transactional as well.
93///
94/// A `ReplicatedEnvironment` joins its replication group when it is created.
95/// When `new()` returns, the node will have established contact with the other
96/// members of the group and will be ready to service operations.
97///
98/// Replicated environments can be created with node type Electable or
99/// Secondary. Electable nodes can be masters or replicas, and participate in
100/// both master elections and commit durability decisions. Secondary nodes can
101/// only be replicas, not masters, and do not participate in either elections or
102/// durability decisions.
103///
104/// # Example
105///
106/// ```ignore
107/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
108///
109/// let config = RepConfig::builder("my_group", "node1", "localhost")
110/// .node_port(5001)
111/// .build();
112/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
113/// ```
114pub struct ReplicatedEnvironment {
115 /// The replication configuration for this node.
116 config: RepConfig,
117 /// Tracks the current node state (Detached, Unknown, Master, Replica).
118 node_state: NodeStateMachine,
119 /// Service for managing the replication group membership.
120 group_service: GroupService,
121 /// Maps VLSNs to log file positions.
122 ///
123 /// Wrapped in `Arc` so that background daemons (election driver,
124 /// VLSN-index persistence flusher) can share access without
125 /// borrowing the env. Closes finding F11 (
126 /// the 2026 review).
127 vlsn_index: Arc<VlsnIndex>,
128 /// Tracks acknowledgments from replicas (used by master).
129 ack_tracker: AckTracker,
130 /// Replication statistics.
131 stats: RepStats,
132 /// Active feeder threads (master -> replica streams).
133 feeders: RwLock<Vec<Feeder>>,
134 /// Replica stream for receiving updates from the master.
135 replica_stream: ReplicaStream,
136 /// Tracks the current master node.
137 master_tracker: MasterTracker,
138 /// State change listeners.
139 listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
140 /// Shutdown flag.
141 shutdown: AtomicBool,
142 /// Service dispatcher — listens on the replication port and routes
143 /// incoming connections to the appropriate service handler (feeder, etc.).
144 ///
145 /// `Plain`: plain TCP (default / Phase-2 behaviour).
146 /// `Tls`: TLS + mTLS enforcement (Phase 3, when `RepConfig::tls_config` is set
147 /// and `transport_kind` is `Tls`).
148 ///
149 /// `None` only when the bind address cannot be resolved.
150 tcp_dispatcher: Option<AnyServiceDispatcher>,
151 /// The address the `tcp_dispatcher` is actually bound to (may differ from
152 /// the configured port when port 0 is used in tests).
153 bound_addr: Option<SocketAddr>,
154
155 /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
156 ///
157 /// When set, `become_master` spawns a `FeederRunner` per replica using
158 /// `EnvironmentLogScanner`, and `become_replica` spawns a
159 /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
160 ///
161 /// In HA.
162 env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
163
164 /// Background I/O thread handles spawned during state transitions.
165 ///
166 /// Stored so that `close()` can join them cleanly. Each handle is
167 /// `Option` so we can `take()` it in `close()`.
168 io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
169
170 /// Shutdown flag shared with I/O threads so they terminate when the
171 /// environment is closed.
172 io_shutdown: AtomicBool,
173
174 /// Whether the RESTORE service has been registered on the TCP dispatcher.
175 ///
176 /// When `config.env_home` is `None` at construction time, registration is
177 /// deferred until `with_environment()` provides the env home path.
178 restore_registered: AtomicBool,
179
180 /// In-memory log queue used by the peer feeder service.
181 ///
182 /// When this node is a replica, `apply_entry()` pushes each received log
183 /// entry here. The `PeerFeederService` registered on the TCP dispatcher
184 /// reads from this queue to stream entries to downstream replicas that
185 /// are behind this node (peer-to-peer log distribution, HA style).
186 peer_scanner: Arc<PeerLogScanner>,
187
188 /// Durable Transaction VLSN (D7, JE RepNode.dtvlsn): the highest VLSN
189 /// known to have been replicated to a *majority* of the electable
190 /// replicas. On a master it is computed from feeder ack/heartbeat progress
191 /// (`update_dtvlsn_from_feeders`); on a replica it is set from commit/abort
192 /// records in the stream (`set_dtvlsn`). It advances monotonically (an
193 /// `update_max`). 0 = NULL_VLSN. Used by the election ranking (D2) so the
194 /// most-durable node, not merely the highest-raw-VLSN node, wins.
195 dtvlsn: std::sync::atomic::AtomicU64,
196
197 /// Shared acceptor state used by the ELECTION service handler.
198 /// The election driver updates `own_vlsn` / `own_term` here as the
199 /// node progresses; incoming acceptor sessions read it on every
200 /// connection so their replies always reflect the local node's
201 /// most recent state. Closes finding F6.
202 election_state: Arc<ElectionAcceptorState>,
203
204 /// Self-referential `Weak` populated once the env has been wrapped
205 /// in an `Arc`. Used by the replica I/O thread spawned in
206 /// `become_replica` so it can call `bootstrap_via_dispatcher` when
207 /// the master signals `NeedsRestore`.
208 ///
209 /// Populated lazily via [`Self::init_self_weak`] from `open()` and
210 /// the test harness. When unset (callers that build the env via
211 /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
212 /// the I/O thread falls back to operator-driven bootstrap.
213 self_weak: OnceLock<Weak<Self>>,
214
215 // -----------------------------------------------------------------------
216 // C-C2: active push-feeder infrastructure
217 // -----------------------------------------------------------------------
218 /// Per-replica channels injected via [`Self::register_feeder_channel`].
219 ///
220 /// When [`Self::become_master`] is called (or when the node is already
221 /// master), a [`FeederRunner`] thread is spawned for each registered
222 /// channel, actively streaming entries to that replica over the channel.
223 ///
224 /// Using `register_feeder_channel` is the primary integration point for
225 /// the push-based feeder path. Production deployments wire in a
226 /// `TcpChannel`; test code uses `LocalChannelPair`.
227 feeder_channels: StdMutex<HashMap<String, Arc<dyn crate::net::Channel>>>,
228
229 /// Per-replica dedicated entry queues backing the push-feeder path.
230 ///
231 /// Each `FeederRunner` thread reads exclusively from its replica's queue.
232 /// [`Self::replicate_entry`] and [`Self::apply_entry`] fan out into all
233 /// registered queues so the push runners receive entries without competing
234 /// with [`PeerFeederService`] for ownership of `peer_scanner`.
235 feeder_queues: std::sync::RwLock<HashMap<String, Arc<PeerLogScanner>>>,
236
237 /// Active `FeederRunner` references for acked-VLSN queries and
238 /// clean shutdown (M-4: wait for replicas to catch up).
239 active_feeder_runners: StdMutex<HashMap<String, Arc<FeederRunner>>>,
240
241 /// Monotone VLSN counter shared with the wired `EnvironmentImpl`.
242 ///
243 /// Installed into the environment via
244 /// `EnvironmentImpl::set_replication_vlsn_counter()` when
245 /// `with_environment` is called. Each `log_txn_commit` on the master
246 /// atomically increments this counter and writes a VLSN-tagged WAL entry,
247 /// which `EnvironmentLogScanner` then picks up without any
248 /// `replicate_entry` call from the application.
249 wal_vlsn_counter: Arc<std::sync::atomic::AtomicU64>,
250}
251
252impl ReplicatedEnvironment {
253 /// Create a new replicated environment.
254 ///
255 ///
256 ///
257 /// Creates a replicated environment handle and starts participating in the
258 /// replication group. The node's state is determined when it joins the
259 /// group, and mastership is not preconfigured. If the group has no current
260 /// master, creation will trigger an election to determine whether this node
261 /// will participate as a Master or a Replica.
262 ///
263 /// A brand new node will always join an existing group as a Replica, unless
264 /// it is the very first electable node that is creating the group. In that
265 /// case it joins as the Master of the newly formed singleton group.
266 pub fn new(config: RepConfig) -> Result<Self> {
267 // mTLS Phase 2 (v3.1.0): peer_allowlist enforcement is real at the
268 // TLS channel layer (TlsTcpChannelListener::bind_with_tls_and_allowlist).
269 // Phase 3 (this release): when RepConfig::tls_config is set AND
270 // transport_kind is Tls, the service dispatcher itself enforces mTLS
271 // via TlsTcpServiceDispatcher. For the remaining cases (no TlsConfig
272 // or non-TLS transport) keep the Phase-2 accurate warn.
273 if !config.peer_allowlist.is_empty() {
274 match config.transport_kind {
275 crate::rep_config::RepTransportKind::Tls => {
276 if config.tls_config.is_some() {
277 log::info!(
278 "[{}] peer_allowlist ({} entries) + tls_config set; \
279 mTLS will be enforced on the service dispatcher.",
280 config.node_name,
281 config.peer_allowlist.len(),
282 );
283 } else {
284 log::info!(
285 "[{}] peer_allowlist configured ({} entries) but \
286 tls_config is None — the service dispatcher will \
287 use plain TCP. Set RepConfig::tls_config to \
288 activate end-to-end mTLS on this path.",
289 config.node_name,
290 config.peer_allowlist.len(),
291 );
292 }
293 }
294 _ => {
295 log::warn!(
296 "[{}] peer_allowlist is configured ({} entries) but \
297 transport_kind is not Tls — the allowlist has no \
298 effect without TLS transport. Set \
299 RepTransportKind::Tls to activate mTLS enforcement.",
300 config.node_name,
301 config.peer_allowlist.len(),
302 );
303 }
304 }
305 }
306 let node_state = NodeStateMachine::new();
307 let group_service = GroupService::new(config.group_name.clone());
308 let vlsn_index = {
309 // F11: try to load a previously persisted vlsn.idx from
310 // env_home if one exists. A successfully loaded index lets a
311 // restarted replica resume from where it left off without a
312 // full network restore; a missing or corrupt file falls back
313 // to a fresh in-memory index (caller will need to bootstrap).
314 if let Some(ref home) = config.env_home {
315 match crate::vlsn::persist::load_from_disk(home) {
316 Ok(Some(idx)) => {
317 log::info!(
318 "Node '{}' loaded persisted VLSN index from {} \
319 ({} entries, latest vlsn={})",
320 config.node_name,
321 home.display(),
322 idx.snapshot_entries().len(),
323 idx.get_latest_vlsn(),
324 );
325 Arc::new(idx)
326 }
327 Ok(None) => Arc::new(VlsnIndex::new(10)),
328 Err(e) => {
329 log::warn!(
330 "Node '{}' failed to load persisted VLSN index \
331 from {}: {} (treating as fresh node — network \
332 restore required)",
333 config.node_name,
334 home.display(),
335 e
336 );
337 // Best-effort: remove the corrupt file so the
338 // next persist cycle writes a clean one. A
339 // missing file is the "fresh node" baseline.
340 let _ = std::fs::remove_file(
341 crate::vlsn::persist::index_path(home),
342 );
343 Arc::new(VlsnIndex::new(10))
344 }
345 }
346 } else {
347 Arc::new(VlsnIndex::new(10))
348 }
349 };
350 let ack_tracker = AckTracker::new();
351 let stats = RepStats::new();
352 let feeders = RwLock::new(Vec::new());
353 let replica_stream = ReplicaStream::new();
354 let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
355
356 // Start the service dispatcher.
357 //
358 // Phase 3: when RepConfig::tls_config is set AND transport_kind is Tls,
359 // start a TlsTcpServiceDispatcher (mTLS enforced). Otherwise fall back
360 // to the plain-TCP TcpServiceDispatcher.
361 let listen_addr_str =
362 format!("{}:{}", config.node_host, config.node_port);
363 let mut restore_registered_init = false;
364
365 // Returns (AnyServiceDispatcher, bound_addr) or (None, None) on error.
366 let (tcp_dispatcher, bound_addr) = match listen_addr_str
367 .parse::<SocketAddr>()
368 {
369 Ok(addr) => {
370 let build_result: Result<(AnyServiceDispatcher, SocketAddr)> =
371 Self::build_dispatcher(&config, addr);
372 match build_result {
373 Ok((dispatcher, bound)) => {
374 // Register the network restore handler.
375 if let Some(ref home) = config.env_home {
376 let restore_server =
377 NetworkRestoreServer::new(home.clone());
378 dispatcher.register(
379 RESTORE_SERVICE_NAME,
380 Arc::new(restore_server),
381 );
382 log::debug!(
383 "Node '{}' RESTORE service registered \
384 (env_home={})",
385 config.node_name,
386 home.display(),
387 );
388 restore_registered_init = true;
389 }
390 let kind =
391 if dispatcher.is_tls() { "TLS" } else { "TCP" };
392 log::info!(
393 "Node '{}' {} service dispatcher started on {}",
394 config.node_name,
395 kind,
396 bound
397 );
398 (Some(dispatcher), Some(bound))
399 }
400 Err(e) => {
401 log::warn!(
402 "Node '{}' failed to start service dispatcher \
403 on {}: {}",
404 config.node_name,
405 listen_addr_str,
406 e
407 );
408 (None, None)
409 }
410 }
411 }
412 Err(e) => {
413 log::warn!(
414 "Node '{}' cannot parse listen address '{}': {}",
415 config.node_name,
416 listen_addr_str,
417 e
418 );
419 (None, None)
420 }
421 };
422
423 // Build the in-memory peer log scanner; register the peer feeder
424 // service on the dispatcher so downstream replicas can connect.
425 let peer_scanner = Arc::new(PeerLogScanner::new());
426 // F5/F31: build the acceptor state with persistence enabled when
427 // env_home is configured. Crash-durable promises are required
428 // for the Paxos safety invariant after a process restart.
429 let election_state =
430 Arc::new(if let Some(ref home) = config.env_home {
431 ElectionAcceptorState::with_env_home(
432 config.node_name.clone(),
433 1,
434 home,
435 )
436 } else {
437 ElectionAcceptorState::new(config.node_name.clone(), 1)
438 });
439 if let Some(ref dispatcher) = tcp_dispatcher {
440 let service = PeerFeederService::new(Arc::clone(&peer_scanner));
441 dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
442 log::debug!(
443 "Node '{}' PEER_FEEDER service registered",
444 config.node_name,
445 );
446 // F6: register the ELECTION service so peers can run
447 // run_acceptor against this node when proposing.
448 let election_svc =
449 Arc::new(ElectionService::new(Arc::clone(&election_state)));
450 dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
451 log::debug!(
452 "Node '{}' ELECTION service registered",
453 config.node_name,
454 );
455 }
456
457 let env = Self {
458 config,
459 node_state,
460 group_service,
461 vlsn_index,
462 ack_tracker,
463 stats,
464 feeders,
465 replica_stream,
466 master_tracker,
467 listeners: RwLock::new(Vec::new()),
468 shutdown: AtomicBool::new(false),
469 tcp_dispatcher,
470 bound_addr,
471 env_impl: StdMutex::new(None),
472 io_threads: StdMutex::new(Vec::new()),
473 io_shutdown: AtomicBool::new(false),
474 restore_registered: AtomicBool::new(restore_registered_init),
475 peer_scanner,
476 dtvlsn: std::sync::atomic::AtomicU64::new(0),
477 election_state,
478 self_weak: OnceLock::new(),
479 feeder_channels: StdMutex::new(HashMap::new()),
480 feeder_queues: std::sync::RwLock::new(HashMap::new()),
481 active_feeder_runners: StdMutex::new(HashMap::new()),
482 wal_vlsn_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
483 };
484
485 Ok(env)
486 }
487
488 /// Open a replicated environment with the standard production
489 /// lifecycle.
490 ///
491 /// This is the entry point recommended by the mdBook chapters:
492 /// it allocates the `ReplicatedEnvironment`, registers all
493 /// services on the TCP dispatcher, and spawns the **election
494 /// driver** background thread that runs Paxos rounds against
495 /// known peers until the node has resolved into either Master or
496 /// Replica state.
497 ///
498 /// Closes finding F6 of the 2026 review.
499 ///
500 /// Use [`ReplicatedEnvironment::new`] directly only when the
501 /// caller plans to drive state transitions explicitly (test
502 /// harnesses, scripted bootstrap, recovery tooling).
503 pub fn open(config: RepConfig) -> Result<Arc<Self>> {
504 let env = Arc::new(Self::new(config)?);
505 env.init_self_weak();
506 env.start_election_driver();
507 env.start_vlsn_persistence_daemon();
508 env.register_admin_service();
509 Ok(env)
510 }
511
512 /// Build the service dispatcher for this node.
513 ///
514 /// Phase 3 logic: when `config.transport_kind == Tls` AND
515 /// `config.tls_config` is `Some`, start a
516 /// [`crate::net::service_dispatcher::TlsTcpServiceDispatcher`] that
517 /// enforces mTLS with the configured `peer_allowlist`. Otherwise
518 /// start the plain-TCP [`TcpServiceDispatcher`].
519 ///
520 /// Returns `(dispatcher, bound_addr)` or a `RepError` on bind / TLS
521 /// config failure.
522 fn build_dispatcher(
523 #[cfg_attr(not(feature = "tls-rustls"), allow(unused_variables))]
524 config: &RepConfig,
525 addr: SocketAddr,
526 ) -> Result<(AnyServiceDispatcher, SocketAddr)> {
527 #[cfg(feature = "tls-rustls")]
528 if config.transport_kind == crate::rep_config::RepTransportKind::Tls {
529 use crate::auth::PeerAllowlist;
530 use crate::net::service_dispatcher::TlsTcpServiceDispatcher;
531 let tls = config.tls_config.as_ref().ok_or_else(|| {
532 RepError::ConfigError(format!(
533 "node '{}': transport_kind=Tls requires a tls_config",
534 config.node_name,
535 ))
536 })?;
537 let allowlist =
538 PeerAllowlist::new(config.peer_allowlist.iter().cloned());
539 // Fail-closed: an empty allowlist with TLS transport is a
540 // misconfiguration. The same policy is enforced at the TLS
541 // listener and QUIC constructors; downgrading to plain TCP here
542 // would be a silent security regression for a node that asked
543 // for TLS.
544 if allowlist.is_empty() {
545 return Err(RepError::ConfigError(format!(
546 "node '{}': transport_kind=Tls requires a non-empty \
547 peer_allowlist (mTLS enforcement is fail-closed)",
548 config.node_name,
549 )));
550 }
551 let disp = TlsTcpServiceDispatcher::new(addr, tls, allowlist)?;
552 let bound = disp.start()?;
553 return Ok((AnyServiceDispatcher::Tls(disp), bound));
554 }
555 // Plain-TCP dispatcher (default or when TLS config is missing).
556 let disp = TcpServiceDispatcher::new(addr).map_err(|e| {
557 RepError::NetworkError(format!("TCP dispatcher init: {e}"))
558 })?;
559 let bound = disp.start()?;
560 Ok((AnyServiceDispatcher::Plain(disp), bound))
561 }
562
563 /// Populate the env's self-referential `Weak` so background
564 /// threads can obtain a back-reference for auto-orchestrated
565 /// follow-up actions (e.g. replica auto-bootstrap on
566 /// `NeedsRestore`). Idempotent: subsequent calls are silent
567 /// no-ops because the inner [`OnceLock`] only accepts one set.
568 ///
569 /// Callers that wrap the env in `Arc` and want auto-bootstrap
570 /// behaviour should call this immediately after construction.
571 /// `Self::open` already does so. Test harnesses that drive
572 /// transitions manually (`RepTestBase`) also call this so the
573 /// auto-bootstrap path is exercised in tests.
574 pub fn init_self_weak(self: &Arc<Self>) {
575 let _ = self.self_weak.set(Arc::downgrade(self));
576 }
577
578 /// Register the `ADMIN` service handler on the TCP dispatcher.
579 ///
580 /// Closes findings F7 / F8. Holds a `Weak<Self>` so the handler
581 /// does not extend the env's lifetime. Idempotent: re-registering
582 /// is harmless because `TcpServiceDispatcher::register` overwrites
583 /// the existing handler.
584 pub fn register_admin_service(self: &Arc<Self>) {
585 if let Some(ref dispatcher) = self.tcp_dispatcher {
586 crate::group_admin::register_admin_service(
587 dispatcher,
588 Arc::downgrade(self),
589 );
590 log::debug!(
591 "Node '{}' ADMIN service registered",
592 self.config.node_name,
593 );
594 }
595 }
596
597 /// Spawn the VLSN-index persistence daemon (F11).
598 ///
599 /// Periodically (every 2 seconds) snapshots the in-memory
600 /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
601 /// resume from where the replica left off without a full network
602 /// restore. No-op when `config.env_home` is `None`.
603 ///
604 /// Idempotent: only one daemon is ever spawned per env.
605 pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
606 let Some(home) = self.config.env_home.clone() else {
607 return;
608 };
609 {
610 let threads = self.io_threads.lock().unwrap();
611 if threads.iter().any(|h| {
612 h.thread()
613 .name()
614 .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
615 }) {
616 return;
617 }
618 }
619
620 let vlsn_index = Arc::clone(&self.vlsn_index);
621 let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
622 let me = Arc::clone(self);
623 let interval = Duration::from_secs(2);
624
625 let handle = std::thread::Builder::new()
626 .name(name)
627 .spawn(move || {
628 use std::sync::atomic::Ordering;
629 let mut last_persisted_vlsn: u64 = 0;
630 while !me.io_shutdown.load(Ordering::SeqCst)
631 && !me.is_shutdown()
632 {
633 std::thread::sleep(interval);
634 if me.io_shutdown.load(Ordering::SeqCst) {
635 break;
636 }
637 let latest = vlsn_index.get_latest_vlsn();
638 if latest == last_persisted_vlsn {
639 // Nothing new to flush.
640 continue;
641 }
642 // X-2: cap the flush at the last durable checkpoint's
643 // end LSN so the persisted VLSN index never claims
644 // VLSNs beyond the durable B-tree state. After a crash
645 // the recovered tree and the index will be coherent.
646 let cap_lsn = me
647 .env_impl
648 .lock()
649 .unwrap()
650 .as_ref()
651 .and_then(|e| e.get_checkpointer())
652 .map(|c| c.get_last_checkpoint_end())
653 .unwrap_or(noxu_util::NULL_LSN);
654 match crate::vlsn::persist::flush_to_disk_capped(
655 &vlsn_index,
656 &home,
657 cap_lsn,
658 ) {
659 Ok(n) => {
660 log::trace!(
661 "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
662 n,
663 latest,
664 cap_lsn,
665 );
666 last_persisted_vlsn = latest;
667 }
668 Err(e) => {
669 log::warn!(
670 "vlsn-flush: failed to persist VLSN index to {}: {}",
671 home.display(),
672 e
673 );
674 }
675 }
676 }
677 // Final flush on shutdown so a clean close is recoverable.
678 // Cap at the last checkpoint even for the shutdown flush.
679 let cap_lsn = me
680 .env_impl
681 .lock()
682 .unwrap()
683 .as_ref()
684 .and_then(|e| e.get_checkpointer())
685 .map(|c| c.get_last_checkpoint_end())
686 .unwrap_or(noxu_util::NULL_LSN);
687 if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
688 &vlsn_index,
689 &home,
690 cap_lsn,
691 ) {
692 log::warn!(
693 "vlsn-flush (final): failed to persist VLSN index: {}",
694 e
695 );
696 }
697 })
698 .expect("failed to spawn noxu-vlsn-flush thread");
699
700 self.io_threads.lock().unwrap().push(handle);
701 log::debug!(
702 "Node '{}' VLSN persistence daemon started",
703 self.config.node_name,
704 );
705 }
706
707 /// Spawn the election driver background thread.
708 ///
709 /// While the env is in `Detached` or `Unknown` state and no master
710 /// is known, the driver periodically attempts a Paxos election
711 /// against peers in `GroupService` (whose ELECTION services were
712 /// registered at `open()` time). On success the driver calls
713 /// `become_master` (if this node is the winner) or `become_replica`
714 /// (otherwise). On failure (no quorum), the driver waits
715 /// `config.election_timeout` and tries again.
716 ///
717 /// The driver respects `io_shutdown`; on env close the loop exits
718 /// promptly.
719 ///
720 /// Idempotent: a second call is a no-op (only one driver thread is
721 /// ever spawned per env).
722 pub fn start_election_driver(self: &Arc<Self>) {
723 use std::sync::atomic::Ordering;
724 // Reuse io_shutdown for cancellation; a successful spawn is
725 // recorded by appending to io_threads, so a duplicate call
726 // would re-add a thread — we use a one-shot `AtomicBool`
727 // sentinel placed in the io_shutdown's slot via a new field.
728 // Cheaper: a static name check on io_threads is impossible;
729 // instead, gate spawning on whether any io_thread already
730 // carries the driver name.
731 {
732 let threads = self.io_threads.lock().unwrap();
733 if threads.iter().any(|h| {
734 h.thread()
735 .name()
736 .is_some_and(|n| n.starts_with("noxu-election-"))
737 }) {
738 return;
739 }
740 }
741
742 let me = Arc::clone(self);
743 let name = format!("noxu-election-{}", self.config.node_name);
744 let handle = std::thread::Builder::new()
745 .name(name)
746 .spawn(move || {
747 me.run_election_loop();
748 })
749 .expect("failed to spawn election driver thread");
750 self.io_threads.lock().unwrap().push(handle);
751 log::debug!("Node '{}' election driver started", self.config.node_name,);
752 // Keep ordering sane on the io_shutdown flag.
753 let _ = self.io_shutdown.load(Ordering::SeqCst);
754 }
755
756 /// Body of the election driver loop. Public only for tests; called
757 /// by [`Self::start_election_driver`].
758 fn run_election_loop(self: Arc<Self>) {
759 use std::sync::atomic::Ordering;
760 // Maintain an internal monotonically increasing election term.
761 // Each successful or failed round bumps the term so retries do
762 // not collide with stale acceptor promises.
763 let mut term: u64 = 1;
764
765 loop {
766 if self.io_shutdown.load(Ordering::SeqCst) {
767 return;
768 }
769 if self.is_shutdown() {
770 return;
771 }
772
773 let state = self.node_state.get_state();
774 // Stop driving once we've resolved into Master/Replica;
775 // re-arm only if the node returns to Unknown.
776 if matches!(state, NodeState::Master | NodeState::Replica) {
777 std::thread::sleep(Duration::from_millis(200));
778 continue;
779 }
780 if matches!(state, NodeState::Shutdown) {
781 return;
782 }
783
784 // Probe peers for an active master via the existing
785 // GroupService cache. In the absence of a heartbeat path
786 // we rely on master_tracker (set by become_replica from
787 // the receive loop).
788 if let Some(master_name) = self.master_tracker.get_master()
789 && master_name != self.config.node_name
790 {
791 let _ = self.become_replica(&master_name);
792 continue;
793 }
794
795 // Snapshot peers to dial for ELECTION.
796 let peers: Vec<(String, SocketAddr)> = self
797 .group_service
798 .get_all_nodes()
799 .into_iter()
800 .filter(|n| n.name != self.config.node_name)
801 .filter_map(|n| {
802 format!("{}:{}", n.host, n.port)
803 .parse::<SocketAddr>()
804 .ok()
805 .map(|a| (n.name, a))
806 })
807 .collect();
808
809 // Build the local rep group view used by run_election to
810 // compute quorum and resolve the winner name. Include
811 // self.
812 let group = self.local_rep_group_with_self();
813
814 // Update election state for any concurrent acceptor calls.
815 let our_vlsn = self.vlsn_index.get_latest_vlsn();
816 self.election_state.set_vlsn(our_vlsn);
817 self.election_state.set_term(term);
818 // D2: advertise our DTVLSN as the major election-ranking key.
819 self.election_state.set_dtvlsn(self.get_dtvlsn());
820
821 // Connect to each peer's ELECTION service. Failures are
822 // tolerated: a peer that doesn't answer simply contributes
823 // no vote. The election may still reach quorum in the
824 // remaining peers.
825 let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
826 Vec::new();
827 for (peer_name, addr) in &peers {
828 match crate::net::service_dispatcher::connect_to_service(
829 *addr,
830 ELECTION_SERVICE_NAME,
831 ) {
832 Ok(ch) => {
833 let arc: Arc<dyn crate::net::channel::Channel> =
834 Arc::new(ch);
835 channels.push(arc);
836 }
837 Err(e) => {
838 log::trace!(
839 "election driver: peer {} ({}) unreachable: {}",
840 peer_name,
841 addr,
842 e
843 );
844 }
845 }
846 }
847
848 // Resolve our own node_id from the group; if not present
849 // we cannot run an election (closed-world guard — see F22).
850 let self_node_id =
851 group.get_node(&self.config.node_name).map(|n| n.node_id());
852 let self_node_id = match self_node_id {
853 Some(id) => id,
854 None => {
855 log::warn!(
856 "election driver: node '{}' not registered in \
857 own group view; sleeping",
858 self.config.node_name
859 );
860 std::thread::sleep(Duration::from_millis(200));
861 continue;
862 }
863 };
864
865 log::debug!(
866 "election driver on '{}': starting term={} with {} peers",
867 self.config.node_name,
868 term,
869 channels.len(),
870 );
871 let outcome = crate::elections::paxos::run_election_with_phi_dtvlsn(
872 self_node_id,
873 &self.config.node_name,
874 &group,
875 &channels,
876 our_vlsn,
877 /* priority */ 1,
878 term,
879 /* own_dtvlsn (D2 major ranking key) */
880 self.get_dtvlsn(),
881 None,
882 std::time::Duration::from_millis(500),
883 );
884
885 match outcome {
886 Some(winner_id) if winner_id == self_node_id => {
887 if let Err(e) = self.become_master(term) {
888 log::warn!(
889 "election driver: become_master failed: {}",
890 e
891 );
892 } else {
893 log::info!(
894 "election driver: '{}' became master at term {}",
895 self.config.node_name,
896 term,
897 );
898 }
899 }
900 Some(winner_id) => {
901 if let Some(winner_node) = group
902 .get_nodes()
903 .into_iter()
904 .find(|n| n.node_id() == winner_id)
905 {
906 if let Err(e) = self.become_replica(&winner_node.name) {
907 log::warn!(
908 "election driver: become_replica failed: {}",
909 e
910 );
911 } else {
912 log::info!(
913 "election driver: '{}' became replica of '{}' at term {}",
914 self.config.node_name,
915 winner_node.name,
916 term,
917 );
918 }
919 }
920 }
921 None => {
922 log::debug!(
923 "election driver on '{}' term={}: no quorum",
924 self.config.node_name,
925 term,
926 );
927 }
928 }
929
930 term = term.saturating_add(1);
931 // Back off so we don't pin the loop on transient failures.
932 std::thread::sleep(
933 self.config.election_timeout.min(Duration::from_millis(500)),
934 );
935 }
936 }
937
938 /// Internal: a `RepGroup` snapshot that includes self.
939 fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
940 let mut group = self.get_rep_group();
941 // Ensure self is present in the group view; the
942 // group_service does not auto-register the local node.
943 if group.get_node(&self.config.node_name).is_none() {
944 let mut self_node = crate::rep_node::RepNode::new(
945 self.config.node_name.clone(),
946 self.config.node_type,
947 self.config.node_host.clone(),
948 self.config.node_port,
949 /* node_id */ 0,
950 );
951 // Stable self node_id derived from the name hash so
952 // re-creations in the same process don't collide.
953 use std::hash::{Hash, Hasher};
954 let mut hasher = std::collections::hash_map::DefaultHasher::new();
955 self.config.node_name.hash(&mut hasher);
956 // Restrict to a u32 range and avoid 0 (reserved for
957 // "unknown").
958 let id = ((hasher.finish() as u32) | 1).max(1);
959 self_node.node_id = id;
960 group.add_node(self_node);
961 }
962 group
963 }
964
965 /// Return the socket address the TCP service dispatcher is bound to.
966 ///
967 /// This may differ from the configured `node_port` when port 0 is used
968 /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
969 /// could not be started (e.g. the address is not resolvable).
970 pub fn bound_addr(&self) -> Option<SocketAddr> {
971 self.bound_addr
972 }
973
974 /// Wire a live `EnvironmentImpl` into this replicated environment.
975 ///
976 /// After this call, state transitions (`become_master`, `become_replica`)
977 /// will spawn real feeder/receiver I/O threads backed by the live log.
978 ///
979 /// If the RESTORE service was not registered at construction time (because
980 /// `config.env_home` was `None`), it is registered here using the
981 /// environment's actual home path. This mirrors`RepNode.envSetup()`
982 /// which registers the restore handler during environment wiring.
983 ///
984 /// Environment reference wiring.
985 /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
986 pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
987 // Register RESTORE service lazily if not already done.
988 if !self.restore_registered.load(Ordering::SeqCst)
989 && let Some(ref dispatcher) = self.tcp_dispatcher
990 {
991 let env_home = env.get_env_home().to_path_buf();
992 let restore_server = NetworkRestoreServer::new(env_home.clone());
993 dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
994 self.restore_registered.store(true, Ordering::SeqCst);
995 log::debug!(
996 "Node '{}' RESTORE service registered via with_environment \
997 (env_home={})",
998 self.config.node_name,
999 env_home.display(),
1000 );
1001 }
1002
1003 // X-14: rebuild the VLSN index from recovery-replayed LN entries.
1004 // After a crash the on-disk vlsn.idx may be stale (either ahead of
1005 // the recovered B-tree, or behind if vlsn.idx was not flushed
1006 // after the last checkpoint). Re-registering all (vlsn, lsn) pairs
1007 // from the redo pass gives a consistent in-memory index.
1008 if !env.recovery_vlsns.is_empty() {
1009 log::info!(
1010 "Node '{}': rebuilding VLSN index from {} recovered entries",
1011 self.config.node_name,
1012 env.recovery_vlsns.len(),
1013 );
1014 for &(vlsn, lsn_u64) in &env.recovery_vlsns {
1015 let lsn = noxu_util::Lsn::from_u64(lsn_u64);
1016 self.vlsn_index.register(
1017 vlsn,
1018 lsn.file_number(),
1019 lsn.file_offset(),
1020 );
1021 }
1022 }
1023
1024 // X-1: truncate the VLSN index to the rollback matchpoint if recovery
1025 // detected a completed rollback period. The matchpoint is the highest
1026 // LSN that is still valid after the rollback; entries with higher VLSNs
1027 // correspond to data that was rolled back and must not appear in the
1028 // index.
1029 if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
1030 // Find the latest VLSN whose LSN is at or before the matchpoint.
1031 // Scan the recovered VLSN pairs (sorted ascending) to find the
1032 // boundary.
1033 let safe_vlsn = env
1034 .recovery_vlsns
1035 .iter()
1036 .rev()
1037 .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
1038 .map(|&(vlsn, _)| vlsn)
1039 .unwrap_or(0);
1040 log::info!(
1041 "Node '{}': truncating VLSN index after vlsn={} \
1042 (rollback matchpoint lsn={:#x})",
1043 self.config.node_name,
1044 safe_vlsn,
1045 matchpoint_lsn_u64,
1046 );
1047 self.vlsn_index.truncate_after(safe_vlsn);
1048 }
1049
1050 *self.env_impl.lock().unwrap() = Some(Arc::clone(&env));
1051
1052 // C-C2b: install the VLSN counter so log_txn_commit writes
1053 // VLSN-tagged headers. When become_master then spawns an
1054 // EnvironmentLogScanner-backed FeederRunner, it will find these
1055 // entries and auto-feed them to replicas without any
1056 // replicate_entry call from the application.
1057 env.set_replication_vlsn_counter(Arc::clone(&self.wal_vlsn_counter));
1058 }
1059
1060 /// Get the current node state.
1061 ///
1062 ///
1063 ///
1064 /// Returns the current state of the node associated with this replication
1065 /// environment. If the caller's intent is to track the state of the node,
1066 /// `StateChangeListener` may be a more convenient and efficient approach.
1067 pub fn get_state(&self) -> NodeState {
1068 self.node_state.get_state()
1069 }
1070
1071 /// Check if this node is the master.
1072 ///
1073 /// Returns true if the node's current state is Master.
1074 pub fn is_master(&self) -> bool {
1075 self.node_state.get_state() == NodeState::Master
1076 }
1077
1078 /// Returns true if this node is an *authoritative* master (D4, JE
1079 /// `ElectionQuorum.isAuthoritativeMaster`): it is the group master AND it
1080 /// is still connected to enough replicas that, including itself, a
1081 /// SIMPLE_MAJORITY quorum is present.
1082 ///
1083 /// A master on the minority side of a network partition is NOT
1084 /// authoritative — it must not claim the special election ranking
1085 /// (`MASTER_RANKING`) nor (eventually) continue accepting writes, so the
1086 /// majority side can elect a fresh master without it competing
1087 /// (split-brain prevention).
1088 ///
1089 /// "Active replica count" = the number of currently-connected push-feeder
1090 /// runners serving *electable* peers (Monitors/Secondaries do not count
1091 /// toward the election quorum). `+ 1` for this master itself.
1092 pub fn is_authoritative_master(&self) -> bool {
1093 if !self.is_master() {
1094 return false;
1095 }
1096 let group = self.get_rep_group();
1097 // Total electable nodes (incl. self) — peers + this master.
1098 let electable_total: usize = group
1099 .get_nodes()
1100 .iter()
1101 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
1102 .count()
1103 + 1; // +1 for self/master (not registered as a peer)
1104
1105 // Active replicas = connected feeder runners whose peer is electable.
1106 let active_electable_replicas: usize = {
1107 let runners = self.active_feeder_runners.lock().unwrap();
1108 runners
1109 .keys()
1110 .filter(|name| {
1111 group
1112 .get_node(name)
1113 .map(|n| {
1114 n.node_type == crate::node_type::NodeType::Electable
1115 })
1116 .unwrap_or(false)
1117 })
1118 .count()
1119 };
1120 Self::authoritative_quorum_met(
1121 active_electable_replicas,
1122 electable_total,
1123 )
1124 }
1125
1126 /// Pure SIMPLE_MAJORITY quorum check for `is_authoritative_master` (JE
1127 /// `ElectionQuorum.isAuthoritativeMaster`): `(activeReplicas + 1) >=
1128 /// quorumSize` where `quorumSize = electableTotal / 2 + 1`.
1129 fn authoritative_quorum_met(
1130 active_electable_replicas: usize,
1131 electable_total: usize,
1132 ) -> bool {
1133 let quorum_size = electable_total / 2 + 1;
1134 (active_electable_replicas + 1) >= quorum_size
1135 }
1136
1137 /// Check if this node is a replica.
1138 ///
1139 /// Returns true if the node's current state is Replica.
1140 pub fn is_replica(&self) -> bool {
1141 self.node_state.get_state() == NodeState::Replica
1142 }
1143
1144 /// Returns true if the node is currently participating in the group
1145 /// as a Replica or a Master.
1146 pub fn is_active(&self) -> bool {
1147 self.node_state.get_state().is_active()
1148 }
1149
1150 /// Get the node name.
1151 ///
1152 ///
1153 ///
1154 /// Returns the unique name used to identify this replicated environment.
1155 pub fn get_node_name(&self) -> &str {
1156 self.config.node_name.as_str()
1157 }
1158
1159 /// Get the group name.
1160 ///
1161 /// Returns the name of the replication group this node belongs to.
1162 pub fn get_group_name(&self) -> &str {
1163 self.config.group_name.as_str()
1164 }
1165
1166 /// Get the current master (if known).
1167 ///
1168 /// Returns the name of the node that is currently the master, or None
1169 /// if the master is not known (e.g. the node is in the Unknown or
1170 /// Detached state).
1171 pub fn get_master_name(&self) -> Option<String> {
1172 self.master_tracker.get_master()
1173 }
1174
1175 /// Get the replication group info.
1176 ///
1177 ///
1178 ///
1179 /// Returns a description of the replication group as known by this node.
1180 /// The replicated group metadata is stored in a replicated database and
1181 /// updates are propagated by the current master node to all replicas. If
1182 /// this node is not the master, it is possible for its description of the
1183 /// group to be out of date.
1184 pub fn get_group(&self) -> &GroupService {
1185 &self.group_service
1186 }
1187
1188 /// Add a peer node to the replication group at runtime.
1189 ///
1190 /// The node is registered in the `GroupService` so elections and quorum
1191 /// calculations immediately reflect the new membership.
1192 pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
1193 use crate::group_service::NodeInfo;
1194 use std::time::Instant;
1195
1196 let info = NodeInfo {
1197 name: node.name.clone(),
1198 node_type: node.node_type,
1199 host: node.host.clone(),
1200 port: node.port,
1201 node_id: node.node_id,
1202 joined_at: Instant::now(),
1203 last_seen: Instant::now(),
1204 is_active: true,
1205 known_vlsn: 0,
1206 log_range: None,
1207 read_capacity_pct: node.read_capacity_pct,
1208 write_capacity_pct: node.write_capacity_pct,
1209 latency_hint_ms: node.latency_hint_ms,
1210 };
1211 self.group_service.add_node(info)?;
1212 log::info!(
1213 "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1214 self.config.node_name,
1215 node.name,
1216 node.host,
1217 node.port,
1218 self.config.group_name,
1219 );
1220
1221 // F9: if we are the current master, immediately register a
1222 // `Feeder` tracker for the new peer so AckTracker bookkeeping
1223 // and downstream pull-based streaming work without a forced
1224 // re-election.
1225 if self.is_master()
1226 && (node.node_type == crate::node_type::NodeType::Electable
1227 || node.node_type == crate::node_type::NodeType::Secondary)
1228 {
1229 let mut feeders = self.feeders.write();
1230 if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1231 feeders.push(Feeder::new(node.name.clone()));
1232 log::debug!(
1233 "Node '{}' (master): dispatched Feeder for new peer '{}'",
1234 self.config.node_name,
1235 node.name,
1236 );
1237 }
1238 }
1239 Ok(())
1240 }
1241
1242 /// Remove a peer node from the replication group by name.
1243 ///
1244 /// The node is deregistered from the `GroupService`. Elections initiated
1245 /// after this call will not include the removed node in quorum calculations.
1246 pub fn remove_peer(&self, name: &str) -> Result<()> {
1247 self.group_service.remove_node(name)?;
1248 log::info!(
1249 "Node '{}': removed peer '{}' from group '{}'",
1250 self.config.node_name,
1251 name,
1252 self.config.group_name,
1253 );
1254 Ok(())
1255 }
1256
1257 /// Update the capacity and latency metadata of an existing peer.
1258 ///
1259 /// Only the following fields are updated from `node`:
1260 /// - `read_capacity_pct`
1261 /// - `write_capacity_pct`
1262 /// - `latency_hint_ms`
1263 ///
1264 /// The node's identity (name, address, port, node_type) is preserved.
1265 /// Safe to call while replication is active.
1266 ///
1267 /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1268 /// is rebuilt to reflect the new capacity/latency weights.
1269 ///
1270 /// # Note
1271 ///
1272 /// `update_peer_metadata` does not currently re-run
1273 /// `QuorumPolicy::validate(electable_count)` after the metadata
1274 /// change. An LP-optimal `Expression` quorum that was safe before
1275 /// the update may no longer satisfy the intersection property
1276 /// afterwards. Until automatic revalidation lands, deployments
1277 /// using `QuorumPolicy::Expression` should call
1278 /// `quorum_policy().validate(get_rep_group().electable_count())`
1279 /// on the returned `RepGroup` after every metadata change and
1280 /// fail the operator-facing operation if validation reports
1281 /// unsafety.
1282 pub fn update_peer_metadata(
1283 &self,
1284 name: &str,
1285 node: crate::rep_node::RepNode,
1286 ) -> Result<()> {
1287 self.group_service.update_node_metadata(
1288 name,
1289 node.read_capacity_pct,
1290 node.write_capacity_pct,
1291 node.latency_hint_ms,
1292 )?;
1293 log::info!(
1294 "Node '{}': updated metadata for peer '{}' \
1295 (read_cap={}, write_cap={}, latency={}ms)",
1296 self.config.node_name,
1297 name,
1298 node.read_capacity_pct,
1299 node.write_capacity_pct,
1300 node.latency_hint_ms,
1301 );
1302 Ok(())
1303 }
1304
1305 /// Returns a snapshot of the current replication group as a `RepGroup`.
1306 ///
1307 /// The snapshot reflects the state at the time of the call; subsequent
1308 /// `add_peer` / `remove_peer` calls are not reflected in it.
1309 pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1310 use crate::rep_group::RepGroup;
1311
1312 let mut group = RepGroup::new(
1313 self.config.group_name.clone(),
1314 self.group_service.get_group_id(),
1315 );
1316 for info in self.group_service.get_all_nodes() {
1317 let mut node = crate::rep_node::RepNode::new(
1318 info.name.clone(),
1319 info.node_type,
1320 info.host.clone(),
1321 info.port,
1322 info.node_id,
1323 );
1324 node.read_capacity_pct = info.read_capacity_pct;
1325 node.write_capacity_pct = info.write_capacity_pct;
1326 node.latency_hint_ms = info.latency_hint_ms;
1327 group.add_node(node);
1328 }
1329 group
1330 }
1331
1332 /// Get the replication configuration.
1333 ///
1334 ///
1335 ///
1336 /// Returns the replication configuration that has been used to create this
1337 /// environment.
1338 pub fn get_config(&self) -> &RepConfig {
1339 &self.config
1340 }
1341
1342 /// Get the current VLSN range on this node.
1343 ///
1344 /// Returns the range of VLSNs currently available on this node.
1345 pub fn get_vlsn_range(&self) -> VlsnRange {
1346 self.vlsn_index.get_range()
1347 }
1348
1349 /// Get the latest VLSN.
1350 ///
1351 /// Returns the most recent VLSN registered on this node.
1352 pub fn get_current_vlsn(&self) -> u64 {
1353 self.vlsn_index.get_latest_vlsn()
1354 }
1355
1356 /// Test-only: clone the env's SHARED VLSN index `Arc`.
1357 ///
1358 /// REP-6: the replica receive loop (`become_replica` ->
1359 /// `EnvironmentLogWriter`) must feed THIS index — the one
1360 /// `get_vlsn_range`, `flush_to_disk`, and election ranking read — not a
1361 /// throwaway. Tests use this to build a writer the same way
1362 /// `become_replica` does and assert the shared index advances.
1363 #[cfg(feature = "test-harness")]
1364 pub fn vlsn_index_arc(&self) -> Arc<crate::vlsn::vlsn_index::VlsnIndex> {
1365 Arc::clone(&self.vlsn_index)
1366 }
1367
1368 /// Return the list of replica names that currently have a `Feeder`
1369 /// tracker on this (master) node.
1370 ///
1371 /// Used by tests and operator tooling. The returned list reflects
1372 /// the master's view at the time of the call; subsequent
1373 /// `add_peer`/`remove_peer` calls may change it.
1374 pub fn feeder_replica_names(&self) -> Vec<String> {
1375 self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1376 }
1377
1378 // -----------------------------------------------------------------------
1379 // C-C2 — active push feeder API
1380 // -----------------------------------------------------------------------
1381
1382 /// Register a channel for pushing log entries to a specific replica.
1383 ///
1384 /// When [`Self::become_master`] is called — or if the node is **already
1385 /// master** — a [`FeederRunner`] background thread is immediately spawned
1386 /// for this channel. The thread reads from a dedicated in-memory queue
1387 /// that is fed by [`Self::replicate_entry`] / [`Self::apply_entry`], and
1388 /// sends framed log entries to the replica over `channel`. Acks sent
1389 /// back by the replica are visible via
1390 /// [`Self::active_feeder_runner_acked_vlsn`].
1391 ///
1392 /// # Production vs. test use
1393 ///
1394 /// *Production*: pass a [`crate::net::TcpChannel`] connected to the
1395 /// replica's inbound feeder service.
1396 /// *Tests*: pass one half of a [`crate::net::LocalChannelPair`].
1397 ///
1398 /// # Note on push vs. pull
1399 ///
1400 /// Registering a channel activates the **push** path: the master
1401 /// initiates and owns the feeder connection. The existing **pull** path
1402 /// (`PeerFeederService` / `catch_up_from_peer`) continues to operate in
1403 /// parallel for replicas that connect proactively. Do not register a
1404 /// channel for a replica that already connects via the pull path, or
1405 /// entries may be delivered twice.
1406 ///
1407 /// If `become_master` was called *before* registering the channel, call
1408 /// this method afterward; it will spawn the FeederRunner immediately.
1409 pub fn register_feeder_channel(
1410 &self,
1411 replica_name: String,
1412 channel: Arc<dyn crate::net::Channel>,
1413 ) {
1414 {
1415 let mut ch = self.feeder_channels.lock().unwrap();
1416 ch.insert(replica_name.clone(), Arc::clone(&channel));
1417 }
1418 if self.is_master() {
1419 self.spawn_feeder_runner(replica_name, channel);
1420 }
1421 }
1422
1423 /// Return the last VLSN acknowledged by the FeederRunner for `replica_name`.
1424 ///
1425 /// Returns `0` if no FeederRunner is currently active for that replica
1426 /// (either `become_master` was not called yet, or no channel was
1427 /// registered). Use this to poll catch-up progress before shutdown.
1428 pub fn active_feeder_runner_acked_vlsn(&self, replica_name: &str) -> u64 {
1429 self.active_feeder_runners
1430 .lock()
1431 .unwrap()
1432 .get(replica_name)
1433 .map(|r| r.known_replica_vlsn())
1434 .unwrap_or(0)
1435 }
1436
1437 /// Spawn a FeederRunner thread for `replica_name` using `channel`.
1438 ///
1439 /// Creates a dedicated `PeerLogScanner` queue for the replica, registers
1440 /// it in `feeder_queues` so that future `replicate_entry` / `apply_entry`
1441 /// calls fan out into it, spawns the `FeederRunner::run` loop, and
1442 /// records the `Arc<FeederRunner>` in `active_feeder_runners`.
1443 ///
1444 /// Idempotent: if a FeederRunner is already active for `replica_name`
1445 /// (from a prior `become_master` call), it is replaced — the old channel
1446 /// should have been closed already via `close()`.
1447 ///
1448 /// **WAL-scanner auto-feed path (C-C2b)**: when a live `EnvironmentImpl`
1449 /// has been wired via `with_environment`, the FeederRunner thread uses an
1450 /// `EnvironmentLogScanner` as its source. Every `log_txn_commit` on the
1451 /// master writes a VLSN-tagged WAL entry (22-byte header); the scanner
1452 /// finds these entries and streams them to the replica automatically,
1453 /// without any `replicate_entry` call from the application.
1454 ///
1455 /// **Fallback path**: when no `EnvironmentImpl` is wired the runner reads
1456 /// from the in-memory `PeerLogScanner` queue populated by
1457 /// `replicate_entry` / `apply_entry` — the previous manual behaviour.
1458 fn spawn_feeder_runner(
1459 &self,
1460 replica_name: String,
1461 channel: Arc<dyn crate::net::Channel>,
1462 ) {
1463 // Dedicated entry queue: entries flowing from this master reach the
1464 // FeederRunner without competing with PeerFeederService.
1465 let queue = Arc::new(PeerLogScanner::new());
1466 {
1467 self.feeder_queues
1468 .write()
1469 .unwrap()
1470 .insert(replica_name.clone(), Arc::clone(&queue));
1471 }
1472
1473 // REP-9 Part 1: wire an ack sink so the FeederRunner forwards every
1474 // inbound replica ack to `env.record_ack(vlsn, replica_name)`, which
1475 // reaches BOTH the AckTracker (commit-blocking quorum) and the
1476 // matching `Feeder::acked_vlsn` (DTVLSN ranking). Without this the
1477 // ack reached only the runner's private `known_replica_vlsn`. The
1478 // sink holds a `Weak<Self>` so it never extends the env's lifetime;
1479 // if `self_weak` was never initialised we fall back to the plain
1480 // (sink-less) runner — `record_ack` is still reachable from tests.
1481 let runner = match self.self_weak.get().and_then(Weak::upgrade) {
1482 Some(env_arc) => {
1483 let weak = Arc::downgrade(&env_arc);
1484 let sink: crate::stream::feeder::AckSink =
1485 Arc::new(move |name: &str, vlsn: u64| {
1486 if let Some(env) = weak.upgrade() {
1487 env.record_ack(vlsn, name);
1488 }
1489 });
1490 Arc::new(FeederRunner::new_with_ack_sink(
1491 Arc::clone(&channel),
1492 1,
1493 replica_name.clone(),
1494 sink,
1495 ))
1496 }
1497 None => Arc::new(FeederRunner::new(Arc::clone(&channel), 1)),
1498 };
1499 let runner_clone = Arc::clone(&runner);
1500 let replica_clone = replica_name.clone();
1501
1502 // C-C2b: prefer EnvironmentLogScanner (WAL auto-feed) when env is
1503 // wired; fall back to in-memory queue (manual replicate_entry path)
1504 // otherwise.
1505 let env_opt = self.env_impl.lock().unwrap().clone();
1506
1507 let handle = std::thread::Builder::new()
1508 .name(format!("noxu-feeder-{}", replica_name))
1509 .spawn(move || {
1510 if let Some(env) = env_opt {
1511 if let Some(mut scanner) =
1512 EnvironmentLogScanner::new(&env, None)
1513 {
1514 log::info!(
1515 "FeederRunner for replica '{}': using \
1516 EnvironmentLogScanner (WAL auto-feed)",
1517 replica_clone,
1518 );
1519 let _ = runner_clone.run(&mut scanner);
1520 } else {
1521 log::warn!(
1522 "FeederRunner for replica '{}': \
1523 EnvironmentLogScanner unavailable, \
1524 falling back to in-memory queue",
1525 replica_clone,
1526 );
1527 let mut source = PeerScannerAdapter::new(queue, 0);
1528 let _ = runner_clone.run(&mut source);
1529 }
1530 } else {
1531 let mut source = PeerScannerAdapter::new(queue, 0);
1532 let _ = runner_clone.run(&mut source);
1533 }
1534 log::debug!(
1535 "FeederRunner for replica '{}' exited cleanly",
1536 replica_clone
1537 );
1538 })
1539 .expect("failed to spawn FeederRunner thread");
1540
1541 {
1542 let mut runners = self.active_feeder_runners.lock().unwrap();
1543 runners.insert(replica_name.clone(), Arc::clone(&runner));
1544 }
1545 self.io_threads.lock().unwrap().push(handle);
1546
1547 log::info!(
1548 "Node '{}' (master): FeederRunner thread spawned for replica '{}'",
1549 self.config.node_name.as_str(),
1550 replica_name,
1551 );
1552 }
1553
1554 // -----------------------------------------------------------------------
1555
1556 /// Bootstrap this node's environment by network-restoring all `.ndb`
1557 /// files from `peer_name` via the dispatcher's RESTORE service.
1558 ///
1559 /// Closes findings F2 / F4 of the 2026 review.
1560 ///
1561 /// The standalone `NetworkRestore::execute()` opens raw TCP and
1562 /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1563 /// Production replicated environments host the RESTORE handler on the
1564 /// dispatcher, so this method routes through `execute_via_dispatcher`.
1565 ///
1566 /// `peer_name` must be a known peer in `GroupService`; on success the
1567 /// peer's `.ndb` files are written into `config.env_home`. Returns
1568 /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1569 /// fails for any reason.
1570 pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1571 let env_home = self.config.env_home.clone().ok_or_else(|| {
1572 RepError::ConfigError(
1573 "bootstrap_via_dispatcher requires env_home in RepConfig"
1574 .into(),
1575 )
1576 })?;
1577 let peer_info = self
1578 .group_service
1579 .get_all_nodes()
1580 .into_iter()
1581 .find(|n| n.name == peer_name)
1582 .ok_or_else(|| {
1583 RepError::ConfigError(format!(
1584 "peer '{}' not registered in group '{}'",
1585 peer_name, self.config.group_name,
1586 ))
1587 })?;
1588
1589 let cfg = NetworkRestoreConfig {
1590 source_node: peer_info.name.clone(),
1591 source_host: peer_info.host.clone(),
1592 source_port: peer_info.port,
1593 retain_log_files: true,
1594 };
1595 let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1596 restore.execute_via_dispatcher()?;
1597 log::info!(
1598 "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1599 self.config.node_name,
1600 peer_info.name,
1601 peer_info.host,
1602 peer_info.port,
1603 );
1604 Ok(())
1605 }
1606
1607 /// Get replication statistics.
1608 ///
1609 ///
1610 ///
1611 /// Returns statistics associated with this environment.
1612 pub fn get_stats(&self) -> &RepStats {
1613 &self.stats
1614 }
1615
1616 /// Get the ack tracker.
1617 pub fn get_ack_tracker(&self) -> &AckTracker {
1618 &self.ack_tracker
1619 }
1620
1621 /// Ensure the node state machine is in Unknown state, transitioning
1622 /// from Detached if necessary. This is needed because the state machine
1623 /// only allows Detached -> Unknown -> Master/Replica.
1624 pub fn ensure_unknown_state(&self) -> Result<()> {
1625 let current = self.node_state.get_state();
1626 match current {
1627 NodeState::Unknown => Ok(()),
1628 NodeState::Detached => {
1629 self.node_state.transition_to(NodeState::Unknown)?;
1630 Ok(())
1631 }
1632 // Master and Replica must transition through Unknown before
1633 // joining a new group or reconnecting.
1634 NodeState::Master | NodeState::Replica => {
1635 self.node_state.transition_to(NodeState::Unknown)?;
1636 Ok(())
1637 }
1638 NodeState::Shutdown => {
1639 Err(RepError::StateError("Node is shut down".to_string()))
1640 }
1641 }
1642 }
1643
1644 /// Transition to master state.
1645 ///
1646 /// Transitions this node to Master state for the given election term.
1647 /// As master, the node can accept write operations and feed log entries
1648 /// to replicas.
1649 ///
1650 /// **Active push-feeder** (C-C2): if feeder channels have been registered
1651 /// via [`Self::register_feeder_channel`] before this call, a
1652 /// [`FeederRunner`] background thread is spawned per channel.
1653 ///
1654 /// **WAL-scanner auto-feed path (C-C2b, v3.3.0)**: when
1655 /// [`Self::with_environment`] has been called before `become_master`,
1656 /// each `FeederRunner` thread uses an [`EnvironmentLogScanner`] as its
1657 /// source. Every `log_txn_commit` on the master writes a VLSN-tagged
1658 /// 22-byte WAL entry (via `LogManager::log_with_vlsn`); the scanner
1659 /// discovers these entries and streams them to replicas automatically,
1660 /// without any [`Self::replicate_entry`] call from the application.
1661 ///
1662 /// **Fallback path**: when no `EnvironmentImpl` is wired, the runner
1663 /// reads from the in-memory queue populated by [`Self::replicate_entry`] /
1664 /// [`Self::apply_entry`].
1665 ///
1666 /// If no feeder channels are registered, this call registers per-replica
1667 /// `Feeder` tracker structs for `AckTracker` bookkeeping only. In that
1668 /// case replicas must connect proactively to the `PEER_FEEDER` pull
1669 /// service to receive entries.
1670 pub fn become_master(&self, term: u64) -> Result<()> {
1671 if self.is_shutdown() {
1672 return Err(RepError::StateError(
1673 "Cannot become master: environment is closed".to_string(),
1674 ));
1675 }
1676
1677 // JE invariant: only `Electable` nodes can become master. `Secondary`,
1678 // `Monitor`, and `Arbiter` are not electable and must be rejected at
1679 // the API layer (mirrors JE `ExceptionTest`). See
1680 // `NodeType::can_be_master`.
1681 if !self.config.node_type.can_be_master() {
1682 return Err(RepError::InvalidStateTransition(format!(
1683 "node '{}' has type {} which is not electable as master",
1684 self.config.node_name.as_str(),
1685 self.config.node_type,
1686 )));
1687 }
1688
1689 // Ensure we can reach Master state (may need Detached -> Unknown first)
1690 self.ensure_unknown_state()?;
1691
1692 let old_state = self.node_state.get_state();
1693 self.node_state.transition_to(NodeState::Master)?;
1694 self.master_tracker.set_master(self.config.node_name.as_str(), term);
1695
1696 // --- F9: spawn Feeder trackers for each known replica -------------
1697 //
1698 // Closes finding F9 of the 2026 review.
1699 // The architecture is pull-based: replicas pull from the master's
1700 // `PEER_FEEDER` service via `catch_up_from_peer`. However, the
1701 // master must:
1702 // 1. Track each replica via a `Feeder` so AckTracker bookkeeping
1703 // can attribute replica acks to the right node.
1704 // 2. Push its own writes into `peer_scanner` so replicas pulling
1705 // from `PEER_FEEDER` actually receive entries (`replicate_entry`).
1706 //
1707 // Here we ensure step 1: every known electable peer in the group
1708 // gets a `Feeder` entry.
1709 {
1710 let mut feeders = self.feeders.write();
1711 // Drop any stale feeders left over from a prior role. A
1712 // `Feeder` is just an in-memory tracker; recreating it is
1713 // cheap and avoids state inversion bugs across role changes.
1714 feeders.clear();
1715 for peer in self.group_service.get_all_nodes() {
1716 if peer.name == self.config.node_name {
1717 continue;
1718 }
1719 if peer.node_type != crate::node_type::NodeType::Electable
1720 && peer.node_type != crate::node_type::NodeType::Secondary
1721 {
1722 // Arbiters do not receive log entries.
1723 continue;
1724 }
1725 feeders.push(Feeder::new(peer.name.clone()));
1726 log::debug!(
1727 "Node '{}' (master, term={}): registered Feeder for \
1728 replica '{}'",
1729 self.config.node_name.as_str(),
1730 term,
1731 peer.name,
1732 );
1733 }
1734 }
1735
1736 // For observability, log the count.
1737 log::info!(
1738 "Node '{}' became master for term {} \
1739 (feeder trackers: {} known replicas)",
1740 self.config.node_name.as_str(),
1741 term,
1742 self.feeders.read().len(),
1743 );
1744
1745 // C-C2: spawn FeederRunner threads for pre-registered channels.
1746 //
1747 // When `register_feeder_channel` was called before `become_master`,
1748 // the channels are already in `feeder_channels`. Drain them and
1749 // spawn a FeederRunner per replica. The FeederRunner reads from a
1750 // dedicated `PeerLogScanner` queue (populated by `replicate_entry`
1751 // fan-out) and pushes framed log entries to the replica over the
1752 // registered channel. Acks from the replica are tracked in the
1753 // FeederRunner and visible via `active_feeder_runner_acked_vlsn`.
1754 {
1755 let channels: Vec<(String, Arc<dyn crate::net::Channel>)> = self
1756 .feeder_channels
1757 .lock()
1758 .unwrap()
1759 .iter()
1760 .map(|(k, v)| (k.clone(), Arc::clone(v)))
1761 .collect();
1762 for (replica_name, channel) in channels {
1763 self.spawn_feeder_runner(replica_name, channel);
1764 }
1765 }
1766
1767 // -------------------------------------------------------------------
1768
1769 // Notify listeners
1770 self.notify_listeners(old_state, NodeState::Master);
1771
1772 Ok(())
1773 }
1774
1775 /// Transition to replica state with the given master.
1776 ///
1777 /// Transitions this node to Replica state. The node will receive log
1778 /// entries from the specified master.
1779 ///
1780 /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1781 /// the method prepares an `EnvironmentLogWriter` so that replicated
1782 /// entries can be written to the local log. The actual network connection
1783 /// is established by the `TcpServiceDispatcher`; this method logs intent.
1784 ///
1785 /// In HA.
1786 pub fn become_replica(&self, master_name: &str) -> Result<()> {
1787 if self.is_shutdown() {
1788 return Err(RepError::StateError(
1789 "Cannot become replica: environment is closed".to_string(),
1790 ));
1791 }
1792
1793 // Ensure we can reach Replica state (may need Detached -> Unknown first)
1794 self.ensure_unknown_state()?;
1795
1796 let old_state = self.node_state.get_state();
1797 self.node_state.transition_to(NodeState::Replica)?;
1798 self.master_tracker.set_master(master_name, 0);
1799 self.replica_stream.set_master(master_name);
1800 self.replica_stream.set_state(
1801 crate::stream::replica_stream::ReplicaStreamState::Connecting,
1802 );
1803
1804 // --- G19: start replica receive loop --------------------------------
1805 //
1806 // Connects to the master's PEER_FEEDER service and runs a
1807 // ReplicaReceiver loop in a background thread. The receiver writes
1808 // replicated entries via EnvironmentLogWriter.
1809 if let Some(env) = self.env_impl.lock().unwrap().clone() {
1810 if let Some(log_mgr) = env.get_log_manager() {
1811 // REP-6: feed the env's SHARED, persisted VLSN index (the one
1812 // flush_to_disk persists and get_vlsn_range / election ranking
1813 // read) into the replica receive loop — NOT a throwaway. Using
1814 // a fresh index would leave the persisted vlsn.idx, the
1815 // reported VLSN range, and the DTVLSN-ranking own_vlsn lagging
1816 // the actually-received stream, widening catch-up (or forcing
1817 // an unnecessary network restore) after a clean restart.
1818 // JE: the replica's VLSNIndex IS the environment's persisted
1819 // index (see VLSNIndex).
1820 let vlsn_index = Arc::clone(&self.vlsn_index);
1821
1822 // Resolve the master's socket address from the GroupService.
1823 let master_addr_opt: Option<SocketAddr> = self
1824 .group_service
1825 .get_all_nodes()
1826 .iter()
1827 .find(|n| n.name == master_name)
1828 .and_then(|info| {
1829 format!("{}:{}", info.host, info.port)
1830 .parse::<SocketAddr>()
1831 .ok()
1832 });
1833
1834 let node_name = self.config.node_name.clone();
1835 let master = master_name.to_string();
1836 let vlsn_index_clone = Arc::clone(&vlsn_index);
1837 let shutdown_flag = self.io_shutdown.load(Ordering::SeqCst);
1838 // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
1839 // can call `bootstrap_via_dispatcher` automatically when
1840 // the master signals `NeedsRestore`. When the env was
1841 // never registered with `init_self_weak` (raw
1842 // `Arc::new(Self::new(...))` without going through
1843 // `open()` or the test harness), the weak ref is `None`
1844 // and we fall back to operator-driven bootstrap.
1845 let self_weak: Option<Weak<Self>> =
1846 self.self_weak.get().cloned();
1847
1848 let handle = std::thread::Builder::new()
1849 .name(format!("noxu-replica-{}", node_name))
1850 .spawn(move || {
1851 let mut writer = EnvironmentLogWriter::new(
1852 log_mgr,
1853 vlsn_index_clone,
1854 );
1855
1856 let Some(addr) = master_addr_opt else {
1857 log::warn!(
1858 "noxu-replica-{}: master '{}' address not in RepGroup; \
1859 waiting for TCP dispatcher connection",
1860 node_name, master,
1861 );
1862 return;
1863 };
1864
1865 // Catch-up loop: catch up, observe NeedsRestore,
1866 // optionally auto-bootstrap, retry once. We cap
1867 // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
1868 // (small) so a misbehaving master does not loop
1869 // forever consuming network bandwidth.
1870 const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
1871 let mut attempts: u32 = 0;
1872 loop {
1873 log::info!(
1874 "noxu-replica-{}: connecting to master '{}' at {}",
1875 node_name, master, addr,
1876 );
1877 match crate::stream::peer_feeder::catch_up_from_peer(
1878 addr, 0, &mut writer,
1879 ) {
1880 Ok(true) => {
1881 log::info!(
1882 "noxu-replica-{}: catch-up complete from '{}'",
1883 node_name, master,
1884 );
1885 return;
1886 }
1887 Ok(false) => {
1888 // F2/F4: master signals NeedsRestore.
1889 // Wave 9-A fix 2: if a Weak<Self> was
1890 // plumbed in, upgrade it and call
1891 // `bootstrap_via_dispatcher` ourselves
1892 // so the replica auto-bootstraps and
1893 // resumes catch-up without operator
1894 // intervention.
1895 log::warn!(
1896 "noxu-replica-{}: master '{}' requires restore",
1897 node_name, master,
1898 );
1899 attempts += 1;
1900 if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
1901 log::error!(
1902 "noxu-replica-{}: exceeded \
1903 auto-bootstrap attempts ({}); giving up",
1904 node_name,
1905 MAX_AUTO_BOOTSTRAP_ATTEMPTS,
1906 );
1907 return;
1908 }
1909 let env_arc = match self_weak
1910 .as_ref()
1911 .and_then(Weak::upgrade)
1912 {
1913 Some(e) => e,
1914 None => {
1915 // No back-ref or env dropped:
1916 // fall back to operator-driven
1917 // bootstrap and exit cleanly.
1918 log::warn!(
1919 "noxu-replica-{}: no back-reference \
1920 available; operator must call \
1921 bootstrap_via_dispatcher manually",
1922 node_name,
1923 );
1924 return;
1925 }
1926 };
1927 if env_arc.is_shutdown() {
1928 return;
1929 }
1930 log::info!(
1931 "noxu-replica-{}: auto-bootstrapping via \
1932 dispatcher from '{}' (attempt {})",
1933 node_name, master, attempts,
1934 );
1935 match env_arc
1936 .bootstrap_via_dispatcher(&master)
1937 {
1938 Ok(()) => {
1939 log::info!(
1940 "noxu-replica-{}: auto-bootstrap \
1941 succeeded; resuming catch-up",
1942 node_name,
1943 );
1944 // Drop the strong ref before
1945 // re-entering catch-up so we
1946 // do not keep the env alive
1947 // longer than necessary.
1948 drop(env_arc);
1949 continue;
1950 }
1951 Err(e) => {
1952 log::error!(
1953 "noxu-replica-{}: auto-bootstrap \
1954 failed: {}",
1955 node_name, e,
1956 );
1957 return;
1958 }
1959 }
1960 }
1961 Err(e) => {
1962 if !shutdown_flag {
1963 log::error!(
1964 "noxu-replica-{}: error from master '{}': {e}",
1965 node_name, master,
1966 );
1967 }
1968 return;
1969 }
1970 }
1971 }
1972 })
1973 .expect("failed to spawn noxu-replica thread");
1974
1975 self.io_threads.lock().unwrap().push(handle);
1976
1977 log::debug!(
1978 "Node '{}': replica receive thread started for master '{}'",
1979 self.config.node_name.as_str(),
1980 master_name,
1981 );
1982 } else {
1983 log::warn!(
1984 "Node '{}': no LogManager available (read-only env?); \
1985 replica I/O loop not started",
1986 self.config.node_name.as_str(),
1987 );
1988 }
1989 }
1990 // -------------------------------------------------------------------
1991
1992 // Notify listeners
1993 self.notify_listeners(old_state, NodeState::Replica);
1994
1995 log::info!(
1996 "Node '{}' became replica of master '{}'",
1997 self.config.node_name.as_str(),
1998 master_name
1999 );
2000 Ok(())
2001 }
2002
2003 /// Initiate a master transfer to the target node.
2004 ///
2005 ///
2006 ///
2007 /// Transfers the current master state from this node to one of the
2008 /// electable replicas. The replica that is actually chosen to be the new
2009 /// master is the one with which the Master Transfer can be completed most
2010 /// rapidly. The transfer operation ensures that all changes at this node
2011 /// are available at the new master upon conclusion of the operation.
2012 pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
2013 if self.is_shutdown() {
2014 return Err(RepError::StateError(
2015 "Cannot transfer master: environment is closed".to_string(),
2016 ));
2017 }
2018
2019 if !self.is_master() {
2020 return Err(RepError::InvalidState(
2021 "Master transfer can only be initiated on the master node"
2022 .to_string(),
2023 ));
2024 }
2025
2026 log::info!(
2027 "Node '{}' initiating master transfer to '{}'",
2028 self.config.node_name.as_str(),
2029 config.target_node,
2030 );
2031
2032 // Closes finding F7 of the 2026 review.
2033 //
2034 // Steps:
2035 // 1. Locate the target's address.
2036 // 2. Compute the new term (current observed term + 1).
2037 // 3. Send TRANSFER_MASTER to the target — it will become master.
2038 // 4. Send TRANSFER_MASTER (with the same term + new master name) to
2039 // every other peer so they re-target.
2040 // 5. Demote self to Replica of the target.
2041 //
2042 // The transfer is best-effort: a peer that doesn't ack is logged
2043 // and skipped. The election driver will reconcile any divergence
2044 // on the next election round.
2045
2046 let target_addr = self
2047 .group_service
2048 .get_all_nodes()
2049 .into_iter()
2050 .find(|n| n.name == config.target_node)
2051 .and_then(|n| {
2052 format!("{}:{}", n.host, n.port)
2053 .parse::<std::net::SocketAddr>()
2054 .ok()
2055 })
2056 .ok_or_else(|| {
2057 RepError::ConfigError(format!(
2058 "transfer_master: target '{}' not registered or has bad address",
2059 config.target_node
2060 ))
2061 })?;
2062
2063 let new_term = self.master_tracker.get_term().saturating_add(1);
2064
2065 // 1. Tell the target to become master at the new term.
2066 let target_ack = crate::group_admin::send_transfer_master(
2067 target_addr,
2068 &config.target_node,
2069 new_term,
2070 )
2071 .map_err(|e| {
2072 RepError::NetworkError(format!(
2073 "transfer_master: failed to signal target '{}': {}",
2074 config.target_node, e
2075 ))
2076 })?;
2077 if !target_ack {
2078 return Err(RepError::StateError(format!(
2079 "transfer_master: target '{}' rejected the transfer",
2080 config.target_node
2081 )));
2082 }
2083
2084 // 2. Inform all other peers (best-effort).
2085 for peer in self.group_service.get_all_nodes() {
2086 if peer.name == self.config.node_name
2087 || peer.name == config.target_node
2088 {
2089 continue;
2090 }
2091 if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
2092 let _ = crate::group_admin::send_transfer_master(
2093 addr,
2094 &config.target_node,
2095 new_term,
2096 );
2097 }
2098 }
2099
2100 // 3. Demote self to Replica of the new master.
2101 self.become_replica(&config.target_node)?;
2102
2103 log::info!(
2104 "Node '{}' transferred master to '{}' at term {}",
2105 self.config.node_name.as_str(),
2106 config.target_node,
2107 new_term,
2108 );
2109 Ok(())
2110 }
2111
2112 /// Register a VLSN (as master, after writing a log entry).
2113 ///
2114 /// Maps the given VLSN to the specified log file position. This is called
2115 /// by the master after it writes a replicated log entry.
2116 pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
2117 self.vlsn_index.register(vlsn, file_number, file_offset);
2118 }
2119
2120 /// Replicate a freshly committed log entry from the master.
2121 ///
2122 /// Closes finding F9 of the 2026 review.
2123 ///
2124 /// Combines `register_vlsn` with a push into the in-memory
2125 /// `peer_scanner` so that downstream replicas pulling from this
2126 /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
2127 /// stream the entry without round-tripping through the on-disk
2128 /// log. The local log is still the source of truth; the peer
2129 /// scanner is a fast-path cache that bounds itself via
2130 /// `PeerLogScanner::with_capacity` so old entries are evicted.
2131 ///
2132 /// Should be called by the master after the local commit has
2133 /// fsynced. Calling on a non-master is harmless (the peer
2134 /// scanner cache is also used by replicas) but is logged at trace
2135 /// level for diagnostics.
2136 pub fn replicate_entry(
2137 &self,
2138 vlsn: u64,
2139 file_number: u32,
2140 file_offset: u32,
2141 entry_type: u8,
2142 data: Vec<u8>,
2143 ) {
2144 // Register VLSN -> LSN, dispatching entry type so lastSync /
2145 // lastTxnEnd advance (REP-5; JE VLSNRange.getUpdateForNewMapping).
2146 // An unknown type byte falls back to extend-only registration.
2147 match noxu_log::LogEntryType::from_type_num(entry_type) {
2148 Some(et) => self.vlsn_index.register_with_type(
2149 vlsn,
2150 file_number,
2151 file_offset,
2152 et,
2153 ),
2154 None => self.vlsn_index.register(vlsn, file_number, file_offset),
2155 }
2156 // Pull path: shared peer_scanner serves replicas connecting via
2157 // PeerFeederService (catch_up_from_peer).
2158 self.peer_scanner.push(vlsn, entry_type, data.clone());
2159 // Push path (C-C2): fan out to per-replica FeederRunner queues so
2160 // that threads spawned by become_master can stream entries to each
2161 // registered replica without competing with PeerFeederService.
2162 {
2163 let queues = self.feeder_queues.read().unwrap();
2164 for queue in queues.values() {
2165 queue.push(vlsn, entry_type, data.clone());
2166 }
2167 }
2168 if !self.is_master() {
2169 log::trace!(
2170 "replicate_entry called on non-master node '{}': vlsn={}, type={}",
2171 self.config.node_name,
2172 vlsn,
2173 entry_type,
2174 );
2175 }
2176 }
2177
2178 /// Apply a replicated entry (as replica).
2179 ///
2180 /// Applies a log entry received from the master. This is called by the
2181 /// replica stream handler after receiving an entry from the feeder.
2182 ///
2183 /// `data` is the wire-encoded log-record payload. When the
2184 /// replicated environment has not been wired to a local
2185 /// `noxu_db::Environment` (i.e., before `with_environment` is
2186 /// called) the payload is forwarded into the in-memory peer
2187 /// scanner so that downstream replicas attached to the
2188 /// `PEER_FEEDER` service can re-stream it; the local log is **not**
2189 /// updated. This is documented behaviour rather than a stub — see
2190 /// the 2026 review finding #26 (medium) for the
2191 /// `with_environment`-required local-apply path.
2192 /// cleanup (rep info F35: `_data` placeholder) renames the leading
2193 /// underscore so reviewers don't read it as a TODO.
2194 pub fn apply_entry(
2195 &self,
2196 vlsn: u64,
2197 entry_type: u8,
2198 data: Vec<u8>,
2199 ) -> Result<()> {
2200 if self.is_shutdown() {
2201 return Err(RepError::StateError(
2202 "Cannot apply entry: environment is closed".to_string(),
2203 ));
2204 }
2205
2206 // Register the VLSN in the index, dispatching entry type so
2207 // lastSync/lastTxnEnd advance (REP-5; JE
2208 // VLSNRange.getUpdateForNewMapping).
2209 match noxu_log::LogEntryType::from_type_num(entry_type) {
2210 Some(et) => self.vlsn_index.register_with_type(vlsn, 0, 0, et),
2211 None => self.vlsn_index.register(vlsn, 0, 0),
2212 }
2213
2214 // Push into the peer log scanner so downstream replicas can
2215 // receive this entry via the PEER_FEEDER service.
2216 self.peer_scanner.push(vlsn, entry_type, data.clone());
2217 // C-C2 push path: fan out to per-replica FeederRunner queues.
2218 {
2219 let queues = self.feeder_queues.read().unwrap();
2220 for queue in queues.values() {
2221 queue.push(vlsn, entry_type, data.clone());
2222 }
2223 }
2224
2225 log::trace!(
2226 "Applied replicated entry: vlsn={}, type={}",
2227 vlsn,
2228 entry_type
2229 );
2230 Ok(())
2231 }
2232
2233 /// Record an ack from a replica (as master).
2234 ///
2235 /// Records that the specified replica has acknowledged processing up to
2236 /// the given VLSN. This is used by the master to track durability
2237 /// guarantees.
2238 pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
2239 // Only acks from ELECTABLE replicas count toward the durability
2240 // quorum (JE DurabilityQuorum.replicaAcksQualify: Monitors and
2241 // Secondaries do not qualify). An ack from a non-electable / unknown
2242 // node is recorded for stats elsewhere but must not satisfy the
2243 // ReplicaAckPolicy. If the node is unknown to the group view we err
2244 // toward NOT counting it.
2245 let qualifies = self
2246 .get_rep_group()
2247 .get_node(replica_name)
2248 .map(|n| n.node_type().is_electable())
2249 .unwrap_or(false);
2250 if qualifies {
2251 self.ack_tracker.record_ack(vlsn, replica_name);
2252 }
2253 // REP-9 Part 1: advance the matching `Feeder::acked_vlsn` high-water
2254 // mark (read by `update_dtvlsn_from_feeders` and exposed via
2255 // `get_acked_vlsn`). The production `FeederRunner` previously updated
2256 // only its private `known_replica_vlsn`, so the DTVLSN ranking never
2257 // saw production progress (JE `Feeder.getReplicaTxnEndVLSN`). We
2258 // record the high-water for *any* replica (electable or not); the
2259 // electable filter is reapplied when DTVLSN/quorum is computed.
2260 for feeder in self.feeders.read().iter() {
2261 if feeder.get_replica_name() == replica_name {
2262 feeder.record_ack(vlsn);
2263 break;
2264 }
2265 }
2266 // Recompute the DTVLSN from feeder progress whenever an ack lands.
2267 self.update_dtvlsn_from_feeders();
2268 // REP-9: wake any committer parked in `await_replica_acks`. Its
2269 // satisfaction predicate is the high-water feeder count, not an
2270 // exact-VLSN registration, so we must notify unconditionally (the
2271 // AckTracker's own `record_ack` only notifies when the exact VLSN was
2272 // registered, which the per-frame feeder acks generally are not).
2273 self.ack_tracker.notify_waiters();
2274 }
2275
2276 /// Returns the current Durable Transaction VLSN (D7, JE RepNode.getDTVLSN).
2277 /// The highest VLSN replicated to a majority of electable replicas; 0 if
2278 /// none yet. Used by the election ranking so the most-durable node wins.
2279 pub fn get_dtvlsn(&self) -> u64 {
2280 self.dtvlsn.load(std::sync::atomic::Ordering::Acquire)
2281 }
2282
2283 /// Advance the DTVLSN to `candidate` if it is greater (JE
2284 /// RepNode.updateDTVLSN — an `AtomicLongMax.updateMax`). The DTVLSN can
2285 /// only move forward. Returns the resulting (possibly unchanged) value.
2286 pub fn update_dtvlsn(&self, candidate: u64) -> u64 {
2287 use std::sync::atomic::Ordering;
2288 let mut cur = self.dtvlsn.load(Ordering::Acquire);
2289 while candidate > cur {
2290 match self.dtvlsn.compare_exchange_weak(
2291 cur,
2292 candidate,
2293 Ordering::AcqRel,
2294 Ordering::Acquire,
2295 ) {
2296 Ok(_) => return candidate,
2297 Err(observed) => cur = observed,
2298 }
2299 }
2300 cur
2301 }
2302
2303 /// Set the DTVLSN from the replication stream (JE RepNode.setDTVLSN —
2304 /// used exclusively by the replica, which maintains the DTVLSN from
2305 /// commit/abort records). Still enforced as advance-only via update_max so
2306 /// an out-of-order or stale record cannot move it backward.
2307 pub fn set_dtvlsn(&self, vlsn: u64) {
2308 self.update_dtvlsn(vlsn);
2309 }
2310
2311 /// Master-side DTVLSN computation (D7, JE FeederManager.updateDTVLSN):
2312 /// across the *qualifying* (electable) feeders whose replica-txn-end VLSN
2313 /// exceeds the current DTVLSN, take the minimum; once a SIMPLE_MAJORITY
2314 /// ack-count of them exceeds the current value, advance the DTVLSN to that
2315 /// minimum (a transaction is durable once a majority hold it).
2316 fn update_dtvlsn_from_feeders(&self) {
2317 if !self.is_master() {
2318 return;
2319 }
2320 let curr = self.get_dtvlsn();
2321
2322 // SIMPLE_MAJORITY required-ack-count over the electable group,
2323 // computed the same way as await_replica_acks.
2324 let group = self.get_rep_group();
2325 let electable_peers: u32 = group
2326 .get_nodes()
2327 .iter()
2328 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2329 .count() as u32;
2330 let electable_count = electable_peers + 1; // +1 for self/master
2331 // required electable acks for SIMPLE_MAJORITY = floor(n/2) replicas
2332 // (the master self-acks; a majority is reached when this many peers
2333 // also hold the VLSN).
2334 let durable_ack_count = electable_count / 2;
2335 if durable_ack_count == 0 {
2336 // Single-node (or majority is self alone): the master's own log is
2337 // immediately durable up to its latest VLSN.
2338 self.update_dtvlsn(self.get_current_vlsn());
2339 return;
2340 }
2341
2342 let mut min = u64::MAX;
2343 let mut ack_count: u32 = 0;
2344 for feeder in self.feeders.read().iter() {
2345 // replicaAcksQualify: only electable feeders count (D6).
2346 let qualifies = group
2347 .get_node(&feeder.get_replica_name())
2348 .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2349 .unwrap_or(false);
2350 if !qualifies {
2351 continue;
2352 }
2353 let replica_vlsn = feeder.get_acked_vlsn();
2354 if replica_vlsn <= curr {
2355 continue;
2356 }
2357 if replica_vlsn < min {
2358 min = replica_vlsn;
2359 }
2360 ack_count += 1;
2361 if ack_count >= durable_ack_count {
2362 // A majority of electable replicas hold >= min: durable.
2363 self.update_dtvlsn(min);
2364 return;
2365 }
2366 }
2367 // DTVLSN unchanged.
2368 }
2369
2370 /// REP-9: count qualifying (electable) feeders whose acked high-water VLSN
2371 /// is `>= commit_vlsn`. This is the Rust equivalent of JE
2372 /// `FeederManager.getNumCurrentAckFeeders(commitVLSN)` — the durability
2373 /// quorum is satisfied when this count reaches the required ack count.
2374 /// Only Electable replicas qualify (D6, JE
2375 /// `DurabilityQuorum.replicaAcksQualify`).
2376 fn count_ack_feeders_ge(&self, commit_vlsn: u64) -> u32 {
2377 let group = self.get_rep_group();
2378 let mut count = 0u32;
2379 for feeder in self.feeders.read().iter() {
2380 let qualifies = group
2381 .get_node(&feeder.get_replica_name())
2382 .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2383 .unwrap_or(false);
2384 // A feeder counts only if it has acked a *real* VLSN at or above
2385 // the commit VLSN. `acked_vlsn == 0` is the NULL sentinel (no ack
2386 // yet) and must never satisfy a commit, even when `commit_vlsn`
2387 // itself is 0 (no replicated commit logged) — mirrors JE
2388 // `getReplicaTxnEndVLSN()` returning NULL_VLSN for a fresh feeder,
2389 // which is not `>=` any commit VLSN.
2390 let acked = feeder.get_acked_vlsn();
2391 if qualifies && acked > 0 && acked >= commit_vlsn {
2392 count += 1;
2393 }
2394 }
2395 count
2396 }
2397
2398 /// Set the state change listener.
2399 ///
2400 ///
2401 ///
2402 /// Sets the listener used to receive asynchronous replication node state
2403 /// change events. Note that there is one listener per replication node,
2404 /// not one per handle. Invoking this method adds to the set of listeners.
2405 ///
2406 /// Invoking this method typically results in an immediate callback to the
2407 /// application via the `on_state_change` method, so that the application
2408 /// is made aware of the existing state of the node at the time the listener
2409 /// is first established.
2410 pub fn set_state_change_listener(
2411 &self,
2412 listener: Arc<dyn StateChangeListener>,
2413 ) {
2414 // Immediately notify the listener of the current state
2415 let current_state = self.node_state.get_state();
2416 let event = StateChangeEvent::new(
2417 current_state,
2418 current_state,
2419 self.get_master_name(),
2420 );
2421 listener.on_state_change(event);
2422
2423 let mut listeners = self.listeners.write();
2424 listeners.push(listener);
2425 }
2426
2427 /// Close the replicated environment.
2428 ///
2429 ///
2430 ///
2431 /// Closes this handle and releases any resources. When closed, daemon
2432 /// threads are stopped, even if they are performing work. The node ceases
2433 /// participation in the replication group. If the node was currently the
2434 /// master, the rest of the group will hold an election.
2435 ///
2436 /// The ReplicatedEnvironment should not be closed while any other type of
2437 /// handle that refers to it is not yet closed.
2438 pub fn close(&self) -> Result<()> {
2439 if self.shutdown.swap(true, Ordering::SeqCst) {
2440 // Already closed
2441 return Ok(());
2442 }
2443
2444 let old_state = self.node_state.get_state();
2445
2446 // Transition to Shutdown state. The state machine allows this from
2447 // any non-Shutdown state.
2448 let _ = self.node_state.transition_to(NodeState::Shutdown);
2449
2450 // Notify listeners of the shutdown
2451 self.notify_listeners(old_state, NodeState::Shutdown);
2452
2453 // Clear feeders
2454 {
2455 let mut feeders = self.feeders.write();
2456 feeders.clear();
2457 }
2458
2459 // C-C2: close all registered feeder channels so FeederRunner threads
2460 // observe ChannelClosed and exit their run() loops cleanly.
2461 {
2462 let channels = self.feeder_channels.lock().unwrap();
2463 for (name, ch) in channels.iter() {
2464 if let Err(e) = ch.close() {
2465 log::debug!(
2466 "close: feeder channel for '{}' already closed: {}",
2467 name,
2468 e
2469 );
2470 }
2471 }
2472 }
2473 // Drop all active runners and queues so their Arcs release.
2474 self.active_feeder_runners.lock().unwrap().clear();
2475 self.feeder_queues.write().unwrap().clear();
2476
2477 // Signal and join all I/O threads spawned by become_master /
2478 // become_replica / start_vlsn_persistence_daemon. The vlsn-flush
2479 // thread does a final flush on its way out so a clean close is
2480 // recoverable. Closes finding F11.
2481 self.io_shutdown.store(true, Ordering::SeqCst);
2482 {
2483 let mut threads = self.io_threads.lock().unwrap();
2484 for handle in threads.drain(..) {
2485 let _ = handle.join();
2486 }
2487 }
2488
2489 // Belt-and-braces: even when no daemon is running (e.g.
2490 // `ReplicatedEnvironment::new` without `open`), persist a final
2491 // snapshot if env_home is configured.
2492 if let Some(ref home) = self.config.env_home
2493 && let Err(e) =
2494 crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
2495 {
2496 log::warn!(
2497 "close: failed to persist VLSN index to {}: {}",
2498 home.display(),
2499 e
2500 );
2501 }
2502
2503 // Stop the service dispatcher (the: serviceDispatcher.shutdown()).
2504 if let Some(ref dispatcher) = self.tcp_dispatcher {
2505 dispatcher.stop();
2506 let kind = if dispatcher.is_tls() { "TLS" } else { "TCP" };
2507 log::debug!(
2508 "Node '{}' {} service dispatcher stopped",
2509 self.config.node_name.as_str(),
2510 kind,
2511 );
2512 }
2513
2514 log::info!(
2515 "Replicated environment '{}' in group '{}' closed",
2516 self.config.node_name.as_str(),
2517 self.config.group_name.as_str()
2518 );
2519
2520 Ok(())
2521 }
2522
2523 /// Close this handle and shut down the Replication Group by forcing all
2524 /// active Replicas to exit.
2525 ///
2526 ///
2527 ///
2528 /// This method must be invoked on the node that's currently the Master
2529 /// after all other outstanding handles have been closed.
2530 ///
2531 /// When push-feeder threads are active (registered via
2532 /// [`Self::register_feeder_channel`]), the master first waits up to half
2533 /// of `replica_shutdown_timeout_ms` for each FeederRunner replica to
2534 /// acknowledge all outstanding log entries (VLSN catch-up). Replicas
2535 /// that do not catch up within the budget receive a warning; the master
2536 /// proceeds to send `SHUTDOWN_GROUP` regardless. This closes finding M-4
2537 /// of the v3.x production-readiness review.
2538 ///
2539 /// Replicas that are not fed via a registered channel (pull-based
2540 /// `PeerFeederService` path) are sent `SHUTDOWN_GROUP` without a
2541 /// VLSN-level catch-up wait — that wait requires per-replica ack tracking
2542 /// which the pull path does not yet provide.
2543 pub fn shutdown_group(
2544 &self,
2545 replica_shutdown_timeout_ms: u64,
2546 ) -> Result<()> {
2547 if !self.is_master() {
2548 return Err(RepError::InvalidState(
2549 "shutdownGroup must be invoked on the master".to_string(),
2550 ));
2551 }
2552
2553 log::info!(
2554 "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
2555 self.config.node_name.as_str(),
2556 self.config.group_name.as_str(),
2557 replica_shutdown_timeout_ms,
2558 );
2559
2560 // M-4: Wait for active FeederRunner replicas to ack the master's
2561 // current VLSN before sending SHUTDOWN_GROUP. We allow up to half
2562 // the overall timeout for the catch-up phase so the second half
2563 // remains for the SHUTDOWN_GROUP send loop.
2564 let catchup_budget_ms = replica_shutdown_timeout_ms / 2;
2565 if catchup_budget_ms > 0 {
2566 let master_vlsn = self.vlsn_index.get_range().last();
2567 if master_vlsn > 0 {
2568 let runners: Vec<(String, Arc<FeederRunner>)> = self
2569 .active_feeder_runners
2570 .lock()
2571 .unwrap()
2572 .iter()
2573 .map(|(k, v)| (k.clone(), Arc::clone(v)))
2574 .collect();
2575 if !runners.is_empty() {
2576 let catchup_deadline = std::time::Instant::now()
2577 + Duration::from_millis(catchup_budget_ms);
2578 for (name, runner) in &runners {
2579 loop {
2580 let acked = runner.known_replica_vlsn();
2581 if acked >= master_vlsn
2582 || std::time::Instant::now() >= catchup_deadline
2583 {
2584 if acked < master_vlsn {
2585 log::warn!(
2586 "shutdown_group: replica '{}' acked \
2587 VLSN {} < master VLSN {}; proceeding",
2588 name,
2589 acked,
2590 master_vlsn,
2591 );
2592 } else {
2593 log::info!(
2594 "shutdown_group: replica '{}' caught up \
2595 to VLSN {}",
2596 name,
2597 acked,
2598 );
2599 }
2600 break;
2601 }
2602 std::thread::sleep(Duration::from_millis(10));
2603 }
2604 }
2605 }
2606 }
2607 }
2608
2609 // Closes finding F8 of the 2026 review.
2610 //
2611 // Send SHUTDOWN_GROUP to every known peer. The recipient calls
2612 // its own `close()` and the per-connection ADMIN handler
2613 // returns ACK_OK. Any peer that doesn't ack within the
2614 // timeout is logged and the master proceeds. After signalling
2615 // every peer, the master closes its own env.
2616 let deadline = std::time::Instant::now()
2617 + Duration::from_millis(replica_shutdown_timeout_ms);
2618
2619 for peer in self.group_service.get_all_nodes() {
2620 if peer.name == self.config.node_name {
2621 continue;
2622 }
2623 // Don't exceed the deadline waiting for any single peer.
2624 let now = std::time::Instant::now();
2625 if now >= deadline {
2626 log::warn!(
2627 "shutdown_group: deadline reached; skipping remaining peers"
2628 );
2629 break;
2630 }
2631 let addr_str = format!("{}:{}", peer.host, peer.port);
2632 let addr = match addr_str.parse::<SocketAddr>() {
2633 Ok(a) => a,
2634 Err(e) => {
2635 log::warn!(
2636 "shutdown_group: peer '{}' has bad address {}: {}",
2637 peer.name,
2638 addr_str,
2639 e
2640 );
2641 continue;
2642 }
2643 };
2644 match crate::group_admin::send_shutdown_group(addr) {
2645 Ok(true) => log::info!(
2646 "shutdown_group: peer '{}' acknowledged",
2647 peer.name
2648 ),
2649 Ok(false) => log::warn!(
2650 "shutdown_group: peer '{}' rejected the request",
2651 peer.name
2652 ),
2653 Err(e) => log::warn!(
2654 "shutdown_group: peer '{}' unreachable: {}",
2655 peer.name,
2656 e
2657 ),
2658 }
2659 }
2660
2661 // Master closes itself last.
2662 self.close()
2663 }
2664
2665 /// Check if shutdown is in progress.
2666 pub fn is_shutdown(&self) -> bool {
2667 self.shutdown.load(Ordering::SeqCst)
2668 }
2669
2670 /// Notify all registered listeners of a state change.
2671 fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
2672 let listeners = self.listeners.read();
2673 if !listeners.is_empty() {
2674 let event = StateChangeEvent::new(
2675 old_state,
2676 new_state,
2677 self.get_master_name(),
2678 );
2679 for listener in listeners.iter() {
2680 listener.on_state_change(event.clone());
2681 }
2682 }
2683 }
2684}
2685
2686// ---------------------------------------------------------------------------
2687// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
2688// ---------------------------------------------------------------------------
2689//
2690// `noxu_db::Transaction::commit_with_durability` calls
2691// `await_replica_acks` after the local WAL fsync. This impl:
2692//
2693// 1. Rejects calls on a non-master node with `NotMaster`.
2694// 2. Rejects calls during shutdown with `Shutdown`.
2695// 3. Computes the required ack count from `electable_count` and the
2696// requested policy.
2697// 4. Allocates a unique commit sequence number, registers the ack
2698// requirement on the `AckTracker`, and polls `is_satisfied` with
2699// a small sleep until either the timeout elapses or the policy
2700// is satisfied.
2701// 5. Cleans up the tracker entry on every exit path.
2702//
2703// Closes finding F1 of the 2026 review.
2704impl ReplicaAckCoordinator for ReplicatedEnvironment {
2705 fn await_replica_acks(
2706 &self,
2707 policy: ReplicaAckPolicyKind,
2708 timeout: Duration,
2709 ) -> std::result::Result<u32, AckWaitError> {
2710 // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
2711 // says callers may already short-circuit, but be defensive.
2712 if matches!(policy, ReplicaAckPolicyKind::None) {
2713 return Ok(0);
2714 }
2715
2716 if self.is_shutdown() {
2717 return Err(AckWaitError {
2718 kind: AckWaitErrorKind::Shutdown,
2719 needed: 0,
2720 received: 0,
2721 });
2722 }
2723
2724 if !self.is_master() {
2725 return Err(AckWaitError {
2726 kind: AckWaitErrorKind::NotMaster,
2727 needed: 0,
2728 received: 0,
2729 });
2730 }
2731
2732 // Count electable peers (excluding the master) using the
2733 // RepGroup view, which counts Arbiters and Electables
2734 // identically. Only Electable nodes are counted as data
2735 // replicas able to ack a commit. The master itself is
2736 // *implicit*: it is not registered in `group_service` (only
2737 // peers are), so we add 1 to obtain the total electable
2738 // count expected by `ReplicaAckPolicyKind::required_acks`.
2739 let group = self.get_rep_group();
2740 let electable_peers: u32 = group
2741 .get_nodes()
2742 .iter()
2743 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2744 .count() as u32;
2745 let electable_count: u32 = electable_peers + 1; // +1 for self/master
2746
2747 let needed = policy.required_acks(electable_count);
2748 if needed == 0 {
2749 // Single-node group, or All with only the master itself.
2750 return Ok(0);
2751 }
2752
2753 // REP-9 Part 2: the commit's VLSN is the key. The master assigns a
2754 // VLSN when it logs the TxnCommit (via the shared `wal_vlsn_counter`
2755 // bumped in `EnvironmentImpl::log_txn_commit`), immediately before
2756 // this gate runs. The latest assigned VLSN therefore IS this
2757 // commit's VLSN (the trait contract: "implementations are responsible
2758 // for assigning the commit VLSN internally"). We wait until a quorum
2759 // of qualifying electable replicas have acked a VLSN >= the commit
2760 // VLSN — faithful to JE `FeederManager.getNumCurrentAckFeeders`, which
2761 // counts feeders whose `getReplicaTxnEndVLSN() >= commitVLSN` (a
2762 // high-water `>=` test, NOT an exact-VLSN match).
2763 //
2764 // ponytail: reads the global high-water VLSN, so a concurrent later
2765 // commit can make this gate wait on a slightly higher VLSN than its
2766 // own. That is strictly SAFE (waiting for >= a newer VLSN never
2767 // returns early) and only marginally less precise; thread the
2768 // per-txn VLSN through the trait if exact per-commit granularity is
2769 // ever needed.
2770 let commit_vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
2771
2772 // Register on the AckTracker too: this is what `record_ack` notifies,
2773 // so the condvar wakes us as acks land. The satisfaction decision
2774 // itself is the high-water feeder count below.
2775 self.ack_tracker.register(commit_vlsn, needed);
2776
2777 // Block on the ack condvar until a quorum of electable feeders hold
2778 // the commit VLSN, the timeout elapses, or shutdown is signalled — no
2779 // spin-poll (JE FeederTxns.TxnInfo uses a per-transaction
2780 // CountDownLatch.await; the AckTracker condvar is the shared-mutex
2781 // equivalent). record_ack notifies us as acks arrive.
2782 let satisfied = self.ack_tracker.wait_for_predicate(
2783 timeout,
2784 || self.count_ack_feeders_ge(commit_vlsn) >= needed,
2785 || self.is_shutdown(),
2786 );
2787 if satisfied {
2788 self.ack_tracker.cleanup_through(commit_vlsn);
2789 return Ok(needed);
2790 }
2791 if self.is_shutdown() {
2792 self.ack_tracker.cleanup_through(commit_vlsn);
2793 return Err(AckWaitError {
2794 kind: AckWaitErrorKind::Shutdown,
2795 needed,
2796 received: 0,
2797 });
2798 }
2799 // Timed out: report the partial ack count (qualifying electable
2800 // feeders holding the commit VLSN) so the caller can surface
2801 // InsufficientReplicas.
2802 let received = self.count_ack_feeders_ge(commit_vlsn);
2803 self.ack_tracker.cleanup_through(commit_vlsn);
2804 Err(AckWaitError { kind: AckWaitErrorKind::Timeout, needed, received })
2805 }
2806
2807 /// X-3: allocate the next VLSN for a recovered XA commit and register
2808 /// `lsn` in the VLSN index so feeders can stream the commit.
2809 ///
2810 /// Increments off the current latest VLSN so the new VLSN is strictly
2811 /// monotonically increasing. In a single-node or master-less environment
2812 /// (not master) returns 0 (NULL_VLSN — harmless, the default).
2813 fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
2814 // Only allocate a VLSN when we are the master; on a replica the
2815 // recovered XA should have been replicated by the original master.
2816 if !self.is_master() {
2817 return 0;
2818 }
2819 let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
2820 // A recovered XA commit is a commit log entry; dispatch as TxnCommit
2821 // so lastTxnEnd/lastSync advance (REP-5).
2822 self.vlsn_index.register_with_type(
2823 next_vlsn,
2824 lsn.file_number(),
2825 lsn.file_offset(),
2826 noxu_log::LogEntryType::TxnCommit,
2827 );
2828 log::debug!(
2829 "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
2830 next_vlsn,
2831 lsn
2832 );
2833 next_vlsn
2834 }
2835
2836 /// R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.
2837 ///
2838 /// The caller writes the `TxnCommit` WAL entry with this VLSN embedded,
2839 /// then calls `register_recovered_commit_vlsn` with the actual commit LSN.
2840 /// This two-step approach ensures the WAL entry carries the VLSN so the
2841 /// X-14 VLSN rebuild on second crash can find it.
2842 fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
2843 if !self.is_master() {
2844 return 0;
2845 }
2846 // Peek at the next VLSN without registering. The actual registration
2847 // happens in register_recovered_commit_vlsn() after the WAL write.
2848 self.vlsn_index.get_latest_vlsn() + 1
2849 }
2850
2851 /// R-3: register a pre-allocated VLSN in the VLSN index with the actual
2852 /// commit LSN. Called after writing the `TxnCommit` WAL entry.
2853 fn register_recovered_commit_vlsn(
2854 &self,
2855 vlsn: u64,
2856 commit_lsn: noxu_util::Lsn,
2857 ) {
2858 if vlsn == 0 || !self.is_master() {
2859 return;
2860 }
2861 // The pre-allocated VLSN is for a TxnCommit WAL entry; dispatch the
2862 // type so lastTxnEnd/lastSync advance (REP-5).
2863 self.vlsn_index.register_with_type(
2864 vlsn,
2865 commit_lsn.file_number(),
2866 commit_lsn.file_offset(),
2867 noxu_log::LogEntryType::TxnCommit,
2868 );
2869 log::debug!(
2870 "register_recovered_commit_vlsn: registered vlsn={} for commit_lsn={:?}",
2871 vlsn,
2872 commit_lsn
2873 );
2874 }
2875}
2876
2877#[cfg(test)]
2878mod tests {
2879 use super::*;
2880 use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
2881
2882 /// Helper to create a test config with a fixed port (unit-test style,
2883 /// no real TCP bind needed — hostname "localhost" resolves but the port
2884 /// might be in use; use `test_config_port0` for real TCP tests).
2885 fn test_config(node_name: &str) -> RepConfig {
2886 RepConfig::builder("test_group", node_name, "localhost")
2887 .node_port(5001)
2888 .build()
2889 }
2890
2891 /// Helper to create a test config that binds to an OS-assigned port.
2892 fn test_config_port0(node_name: &str) -> RepConfig {
2893 RepConfig::builder("test_group", node_name, "127.0.0.1")
2894 .node_port(0)
2895 .build()
2896 }
2897
2898 #[test]
2899 fn test_initial_state_is_detached() {
2900 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2901 // NodeStateMachine starts in Detached state
2902 assert_eq!(env.get_state(), NodeState::Detached);
2903 assert!(!env.is_master());
2904 assert!(!env.is_replica());
2905 assert!(!env.is_active());
2906 }
2907
2908 #[test]
2909 fn test_become_master() {
2910 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2911 env.become_master(1).unwrap();
2912 assert_eq!(env.get_state(), NodeState::Master);
2913 assert!(env.is_master());
2914 assert!(!env.is_replica());
2915 assert!(env.is_active());
2916 }
2917
2918 #[test]
2919 fn test_become_replica() {
2920 let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
2921 env.become_replica("node1").unwrap();
2922 assert_eq!(env.get_state(), NodeState::Replica);
2923 assert!(!env.is_master());
2924 assert!(env.is_replica());
2925 assert!(env.is_active());
2926 }
2927
2928 #[test]
2929 fn test_get_node_name() {
2930 let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
2931 assert_eq!(env.get_node_name(), "my_node");
2932 }
2933
2934 #[test]
2935 fn test_get_group_name() {
2936 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2937 assert_eq!(env.get_group_name(), "test_group");
2938 }
2939
2940 #[test]
2941 fn test_register_vlsn_updates_index() {
2942 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2943 env.register_vlsn(1, 0, 100);
2944 env.register_vlsn(2, 0, 200);
2945 env.register_vlsn(3, 0, 300);
2946
2947 assert_eq!(env.get_current_vlsn(), 3);
2948 let range = env.get_vlsn_range();
2949 assert_eq!(range.first(), 1);
2950 assert_eq!(range.last(), 3);
2951 }
2952
2953 #[test]
2954 fn test_record_ack() {
2955 use crate::node_type::NodeType;
2956 use crate::rep_node::RepNode;
2957 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
2958 env.become_master(1).unwrap();
2959 // replicaAcksQualify: only ELECTABLE replicas count toward durability,
2960 // so the replica must be a known electable member of the group.
2961 env.add_peer(RepNode::new(
2962 "replica1".to_string(),
2963 NodeType::Electable,
2964 "127.0.0.1".to_string(),
2965 6001,
2966 2,
2967 ))
2968 .unwrap();
2969
2970 env.register_vlsn(1, 0, 100);
2971 // Register a pending ack requirement, then record ack
2972 env.get_ack_tracker().register(1, 1);
2973 env.record_ack(1, "replica1");
2974 // Ack should be satisfied
2975 assert!(env.get_ack_tracker().is_satisfied(1));
2976 }
2977
2978 #[test]
2979 fn test_record_ack_from_non_electable_does_not_qualify() {
2980 use crate::node_type::NodeType;
2981 use crate::rep_node::RepNode;
2982 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
2983 env.become_master(1).unwrap();
2984 // A Monitor is NOT electable -> its ack must not count (JE
2985 // DurabilityQuorum.replicaAcksQualify).
2986 env.add_peer(RepNode::new(
2987 "monitor1".to_string(),
2988 NodeType::Monitor,
2989 "127.0.0.1".to_string(),
2990 6002,
2991 3,
2992 ))
2993 .unwrap();
2994 env.register_vlsn(1, 0, 100);
2995 env.get_ack_tracker().register(1, 1);
2996 env.record_ack(1, "monitor1");
2997 assert!(
2998 !env.get_ack_tracker().is_satisfied(1),
2999 "non-electable ack must not satisfy durability quorum"
3000 );
3001 // An unknown replica likewise does not qualify.
3002 env.record_ack(1, "ghost");
3003 assert!(!env.get_ack_tracker().is_satisfied(1));
3004 }
3005
3006 #[test]
3007 fn test_authoritative_quorum_met() {
3008 // 1-node group (electable_total=1): master alone IS authoritative
3009 // (quorum_size = 1/2+1 = 1; 0 replicas + 1 >= 1).
3010 assert!(ReplicatedEnvironment::authoritative_quorum_met(0, 1));
3011 // 3-node group (electable_total=3, quorum_size=2): master with 0
3012 // connected replicas is the minority -> NOT authoritative.
3013 assert!(!ReplicatedEnvironment::authoritative_quorum_met(0, 3));
3014 // 3-node group with 1 connected electable replica -> 1+1=2 >= 2 -> yes.
3015 assert!(ReplicatedEnvironment::authoritative_quorum_met(1, 3));
3016 // 5-node group (quorum_size=3): need 2 connected replicas.
3017 assert!(!ReplicatedEnvironment::authoritative_quorum_met(1, 5));
3018 assert!(ReplicatedEnvironment::authoritative_quorum_met(2, 5));
3019 }
3020
3021 #[test]
3022 fn test_is_authoritative_master_requires_master_role() {
3023 // A non-master is never authoritative regardless of connections.
3024 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3025 assert!(!env.is_master());
3026 assert!(!env.is_authoritative_master());
3027 // A single-node master (no peers) IS authoritative.
3028 env.become_master(1).unwrap();
3029 assert!(env.is_authoritative_master());
3030 }
3031
3032 #[test]
3033 fn test_dtvlsn_update_max_advances_only() {
3034 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3035 assert_eq!(env.get_dtvlsn(), 0);
3036 assert_eq!(env.update_dtvlsn(10), 10);
3037 assert_eq!(env.get_dtvlsn(), 10);
3038 // A lower candidate must not move it backward.
3039 assert_eq!(env.update_dtvlsn(5), 10);
3040 assert_eq!(env.get_dtvlsn(), 10);
3041 // Equal is a no-op.
3042 assert_eq!(env.update_dtvlsn(10), 10);
3043 // set_dtvlsn (replica path) is also advance-only.
3044 env.set_dtvlsn(7);
3045 assert_eq!(env.get_dtvlsn(), 10);
3046 env.set_dtvlsn(20);
3047 assert_eq!(env.get_dtvlsn(), 20);
3048 }
3049
3050 #[test]
3051 fn test_dtvlsn_majority_min_across_feeders() {
3052 use crate::node_type::NodeType;
3053 use crate::rep_node::RepNode;
3054 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3055 env.become_master(1).unwrap();
3056 // Three electable replicas → electable_count = 4 (incl. master) →
3057 // durable_ack_count = 2. With master self-ack, DTVLSN advances to the
3058 // min of the 2 highest qualifying feeders that exceed the current
3059 // DTVLSN.
3060 for (i, name) in ["r1", "r2", "r3"].iter().enumerate() {
3061 env.add_peer(RepNode::new(
3062 name.to_string(),
3063 NodeType::Electable,
3064 "127.0.0.1".to_string(),
3065 6100 + i as u16,
3066 (i + 2) as u32,
3067 ))
3068 .unwrap();
3069 }
3070 // Register feeders with differing acked VLSNs: r1=100, r2=80, r3=50.
3071 for (name, vlsn) in [("r1", 100u64), ("r2", 80), ("r3", 50)] {
3072 let f = crate::stream::feeder::Feeder::new(name.to_string());
3073 f.record_ack(vlsn);
3074 env.feeders.write().push(f);
3075 }
3076 env.update_dtvlsn_from_feeders();
3077 // First two qualifying feeders encountered are r1(100), r2(80);
3078 // min(100,80)=80 and that is a majority (2 of 4) → DTVLSN = 80.
3079 // (r3=50 < 80 is not required for durability.)
3080 assert!(
3081 env.get_dtvlsn() >= 80,
3082 "DTVLSN must reach the majority-min (>=80), got {}",
3083 env.get_dtvlsn()
3084 );
3085 }
3086
3087 #[test]
3088 fn test_close_sets_shutdown() {
3089 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3090 assert!(!env.is_shutdown());
3091
3092 env.close().unwrap();
3093 assert!(env.is_shutdown());
3094 // After close, state should be Shutdown
3095 assert_eq!(env.get_state(), NodeState::Shutdown);
3096 }
3097
3098 #[test]
3099 fn test_close_is_idempotent() {
3100 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3101 env.close().unwrap();
3102 env.close().unwrap(); // Should not error
3103 assert!(env.is_shutdown());
3104 }
3105
3106 #[test]
3107 fn test_cannot_become_master_when_shutdown() {
3108 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3109 env.close().unwrap();
3110
3111 let result = env.become_master(1);
3112 assert!(result.is_err());
3113 }
3114
3115 #[test]
3116 fn test_cannot_become_replica_when_shutdown() {
3117 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3118 env.close().unwrap();
3119
3120 let result = env.become_replica("master");
3121 assert!(result.is_err());
3122 }
3123
3124 #[test]
3125 fn test_cannot_apply_entry_when_shutdown() {
3126 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3127 env.close().unwrap();
3128
3129 let result = env.apply_entry(1, 0, vec![1, 2, 3]);
3130 assert!(result.is_err());
3131 }
3132
3133 #[test]
3134 fn test_cannot_transfer_master_when_not_master() {
3135 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3136 env.become_replica("other").unwrap();
3137
3138 let config = MasterTransferConfig::new(
3139 "target_node".to_string(),
3140 Duration::from_secs(30),
3141 );
3142 let result = env.transfer_master(config);
3143 assert!(result.is_err());
3144 }
3145
3146 #[test]
3147 fn test_transfer_master_requires_registered_target() {
3148 // F7: transfer_master is no longer a no-op; it sends an ADMIN
3149 // TRANSFER_MASTER signal to the target via TCP. An unregistered
3150 // target is rejected at the address-resolution step.
3151 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3152 env.become_master(1).unwrap();
3153
3154 let config = MasterTransferConfig::new(
3155 "unknown_target".to_string(),
3156 Duration::from_secs(30),
3157 );
3158 let result = env.transfer_master(config);
3159 assert!(
3160 result.is_err(),
3161 "transfer_master to unregistered target must error"
3162 );
3163 }
3164
3165 #[test]
3166 fn test_apply_entry_registers_vlsn() {
3167 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3168 env.become_replica("master").unwrap();
3169
3170 env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
3171 env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
3172
3173 assert_eq!(env.get_current_vlsn(), 2);
3174 }
3175
3176 #[test]
3177 fn test_master_name_tracking() {
3178 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3179
3180 // Initially no master known
3181 assert!(env.get_master_name().is_none());
3182
3183 // After becoming master, this node is the master
3184 env.become_master(1).unwrap();
3185 assert_eq!(env.get_master_name(), Some("node1".to_string()));
3186 }
3187
3188 #[test]
3189 fn test_master_to_replica_transition() {
3190 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3191
3192 // Become master first
3193 env.become_master(1).unwrap();
3194 assert_eq!(env.get_master_name(), Some("node1".to_string()));
3195
3196 // Transition to replica (Master -> Replica is valid)
3197 env.become_replica("other_master").unwrap();
3198 assert_eq!(env.get_master_name(), Some("other_master".to_string()));
3199 assert!(env.is_replica());
3200 }
3201
3202 #[test]
3203 fn test_state_change_listener_notification() {
3204 struct TestListener {
3205 call_count: AtomicU32,
3206 last_new_state: noxu_sync::Mutex<Option<NodeState>>,
3207 }
3208
3209 impl StateChangeListener for TestListener {
3210 fn on_state_change(&self, event: StateChangeEvent) {
3211 self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
3212 *self.last_new_state.lock() = Some(event.new_state);
3213 }
3214 }
3215
3216 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3217 let listener = Arc::new(TestListener {
3218 call_count: AtomicU32::new(0),
3219 last_new_state: noxu_sync::Mutex::new(None),
3220 });
3221
3222 // Setting the listener should trigger an immediate notification
3223 env.set_state_change_listener(listener.clone());
3224 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
3225
3226 // State change should trigger another notification
3227 env.become_master(1).unwrap();
3228 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
3229 assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
3230 }
3231
3232 #[test]
3233 fn test_close_notifies_listeners() {
3234 struct ShutdownListener {
3235 shutdown_seen: AtomicBool,
3236 }
3237
3238 impl StateChangeListener for ShutdownListener {
3239 fn on_state_change(&self, event: StateChangeEvent) {
3240 if event.new_state == NodeState::Shutdown {
3241 self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
3242 }
3243 }
3244 }
3245
3246 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3247 let listener = Arc::new(ShutdownListener {
3248 shutdown_seen: AtomicBool::new(false),
3249 });
3250
3251 // The initial notification is for the current (Detached) state
3252 env.set_state_change_listener(listener.clone());
3253
3254 // Become master first so the close transition is meaningful
3255 env.become_master(1).unwrap();
3256 assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3257
3258 env.close().unwrap();
3259 assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3260 }
3261
3262 #[test]
3263 fn test_shutdown_group_requires_master() {
3264 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3265 env.become_replica("other").unwrap();
3266
3267 let result = env.shutdown_group(5000);
3268 assert!(result.is_err());
3269 }
3270
3271 #[test]
3272 fn test_shutdown_group_as_master() {
3273 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3274 env.become_master(1).unwrap();
3275
3276 let result = env.shutdown_group(5000);
3277 assert!(result.is_ok());
3278 assert!(env.is_shutdown());
3279 }
3280
3281 #[test]
3282 fn test_get_config() {
3283 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3284 assert_eq!(env.get_config().node_name, "node1");
3285 assert_eq!(env.get_config().group_name, "test_group");
3286 }
3287
3288 #[test]
3289 fn test_get_stats() {
3290 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3291 let _stats = env.get_stats();
3292 // Just verify we can access stats without panicking
3293 }
3294
3295 // -----------------------------------------------------------------------
3296 // TCP dispatcher tests (H-5 / H-7)
3297 // -----------------------------------------------------------------------
3298
3299 #[test]
3300 fn test_tcp_dispatcher_starts_on_new() {
3301 // Use port 0 so the OS assigns an ephemeral port.
3302 let env =
3303 ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
3304 // The dispatcher must have started and bound a real port.
3305 let addr = env.bound_addr();
3306 assert!(addr.is_some(), "expected a bound address");
3307 let addr = addr.unwrap();
3308 assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
3309 }
3310
3311 #[test]
3312 fn test_tcp_dispatcher_stops_on_close() {
3313 let env =
3314 ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
3315 // Dispatcher is running.
3316 assert!(
3317 env.tcp_dispatcher
3318 .as_ref()
3319 .map(|d| d.is_running())
3320 .unwrap_or(false)
3321 );
3322
3323 env.close().unwrap();
3324
3325 // After close, dispatcher must be stopped.
3326 assert!(
3327 !env.tcp_dispatcher
3328 .as_ref()
3329 .map(|d| d.is_running())
3330 .unwrap_or(false),
3331 "dispatcher should be stopped after close"
3332 );
3333 }
3334
3335 #[test]
3336 fn test_tcp_dispatcher_accepts_connection() {
3337 use crate::net::Channel;
3338 use crate::net::ServiceHandler;
3339 use crate::net::service_dispatcher::connect_to_service;
3340 use std::sync::atomic::{AtomicU32, Ordering as AO};
3341 use std::time::Duration;
3342
3343 struct PingHandler {
3344 count: AtomicU32,
3345 }
3346 impl ServiceHandler for PingHandler {
3347 fn service_name(&self) -> &str {
3348 "ping"
3349 }
3350 fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
3351 self.count.fetch_add(1, AO::SeqCst);
3352 // Echo the first message back.
3353 if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
3354 let _ = ch.send(&msg);
3355 }
3356 Ok(())
3357 }
3358 }
3359
3360 let env =
3361 ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
3362 let addr = env.bound_addr().expect("dispatcher must be bound");
3363
3364 // Register a ping handler on the running dispatcher.
3365 if let Some(ref disp) = env.tcp_dispatcher {
3366 let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
3367 disp.register("ping", handler.clone());
3368
3369 // Give the accept thread a moment.
3370 std::thread::sleep(Duration::from_millis(20));
3371
3372 let client = connect_to_service(addr, "ping").unwrap();
3373 client.send(b"hello").unwrap();
3374 let reply = client.receive(Duration::from_secs(2)).unwrap();
3375 assert_eq!(reply, Some(b"hello".to_vec()));
3376
3377 assert_eq!(handler.count.load(AO::SeqCst), 1);
3378 }
3379
3380 env.close().unwrap();
3381 }
3382
3383 #[test]
3384 fn test_become_master_auto_transitions_from_detached() {
3385 // The state machine requires Detached -> Unknown -> Master.
3386 // become_master() should handle this automatically.
3387 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3388 assert_eq!(env.get_state(), NodeState::Detached);
3389 env.become_master(1).unwrap();
3390 assert_eq!(env.get_state(), NodeState::Master);
3391 }
3392
3393 #[test]
3394 fn test_become_replica_auto_transitions_from_detached() {
3395 // The state machine requires Detached -> Unknown -> Replica.
3396 // become_replica() should handle this automatically.
3397 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3398 assert_eq!(env.get_state(), NodeState::Detached);
3399 env.become_replica("master_node").unwrap();
3400 assert_eq!(env.get_state(), NodeState::Replica);
3401 }
3402
3403 #[test]
3404 fn test_cannot_transfer_master_when_shutdown() {
3405 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3406 env.become_master(1).unwrap();
3407 env.close().unwrap();
3408
3409 let config = MasterTransferConfig::new(
3410 "target".to_string(),
3411 Duration::from_secs(30),
3412 );
3413 let result = env.transfer_master(config);
3414 assert!(result.is_err());
3415 }
3416
3417 #[test]
3418 fn test_full_lifecycle() {
3419 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3420
3421 // Start as detached
3422 assert_eq!(env.get_state(), NodeState::Detached);
3423
3424 // Become master
3425 env.become_master(1).unwrap();
3426 assert!(env.is_master());
3427
3428 // Register some VLSNs
3429 env.register_vlsn(1, 0, 100);
3430 env.register_vlsn(2, 0, 200);
3431
3432 // Record ack from replica
3433 env.record_ack(1, "replica1");
3434 env.record_ack(2, "replica1");
3435
3436 // Transition to replica (simulating failover)
3437 env.become_replica("node2").unwrap();
3438 assert!(env.is_replica());
3439
3440 // Apply entries from new master
3441 env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
3442
3443 // Close
3444 env.close().unwrap();
3445 assert!(env.is_shutdown());
3446 }
3447
3448 /// Verify that `with_environment` lazily registers the RESTORE service on
3449 /// the TCP dispatcher when `config.env_home` was not set at construction.
3450 ///
3451 /// This mirrors`RepNode.envSetup()` which registers the restore handler
3452 /// when the environment is wired into the replicated node.
3453 #[test]
3454 fn test_restore_registered_lazily_via_with_environment() {
3455 use noxu_dbi::EnvironmentImpl;
3456 use tempfile::TempDir;
3457
3458 let dir = TempDir::new().expect("temp dir");
3459
3460 // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
3461 let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
3462 .node_port(0)
3463 .build();
3464
3465 let rep_env = ReplicatedEnvironment::new(config).unwrap();
3466
3467 // Not yet registered.
3468 assert!(
3469 !rep_env
3470 .restore_registered
3471 .load(std::sync::atomic::Ordering::SeqCst)
3472 );
3473
3474 // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
3475 let env_impl = Arc::new(
3476 EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
3477 );
3478 rep_env.with_environment(env_impl);
3479
3480 // Now the RESTORE service must be registered.
3481 assert!(
3482 rep_env
3483 .restore_registered
3484 .load(std::sync::atomic::Ordering::SeqCst)
3485 );
3486 }
3487
3488 /// Verify that when `config.env_home` IS set at construction, the RESTORE
3489 /// service is registered immediately (not deferred).
3490 #[test]
3491 fn test_restore_registered_eagerly_when_env_home_in_config() {
3492 use tempfile::TempDir;
3493
3494 let dir = TempDir::new().expect("temp dir");
3495
3496 let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
3497 .node_port(0)
3498 .env_home(dir.path())
3499 .build();
3500
3501 let rep_env = ReplicatedEnvironment::new(config).unwrap();
3502
3503 // Should be registered immediately (env_home was in config).
3504 assert!(
3505 rep_env
3506 .restore_registered
3507 .load(std::sync::atomic::Ordering::SeqCst)
3508 );
3509 }
3510}