noxu_rep/replicated_environment.rs
1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29 AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30 ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43 ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::{
50 AnyServiceDispatcher, TcpServiceDispatcher,
51};
52use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
53use crate::network_restore_server::{
54 NetworkRestoreServer, RESTORE_SERVICE_NAME,
55};
56use crate::node_state::{NodeState, NodeStateMachine};
57use crate::rep_config::RepConfig;
58use crate::rep_stats::RepStats;
59use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
60use crate::stream::feeder::EnvironmentLogScanner;
61use crate::stream::feeder::Feeder;
62use crate::stream::feeder::FeederRunner;
63use crate::stream::peer_feeder::PeerScannerAdapter;
64use crate::stream::peer_feeder::{
65 PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
66};
67use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
68use crate::stream::syncup::{
69 Matchpoint, RollbackDecision, find_matchpoint, verify_rollback,
70};
71use crate::stream::syncup_reader::VlsnIndexView;
72use crate::vlsn::vlsn_index::VlsnIndex;
73use crate::vlsn::vlsn_range::VlsnRange;
74use std::collections::HashMap;
75
76/// Default heartbeat timeout for master liveness detection.
77const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
78
79/// A replicated database environment.
80///
81///
82///
83/// This is the entry point for replication. It wraps a standard Environment
84/// and adds replication capabilities including master election, replica
85/// streaming, and commit acknowledgments.
86///
87/// High Availability (HA) provides a replicated, embedded database
88/// management system which provides fast, reliable, and scalable data
89/// management. HA enables replication of an environment across a Replication
90/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
91///
92/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
93/// operations are executed in the same fashion in both replicated and
94/// non-replicated applications. A `ReplicatedEnvironment` must be
95/// transactional. All replicated databases created in the replicated
96/// environment must be transactional as well.
97///
98/// A `ReplicatedEnvironment` joins its replication group when it is created.
99/// When `new()` returns, the node will have established contact with the other
100/// members of the group and will be ready to service operations.
101///
102/// Replicated environments can be created with node type Electable or
103/// Secondary. Electable nodes can be masters or replicas, and participate in
104/// both master elections and commit durability decisions. Secondary nodes can
105/// only be replicas, not masters, and do not participate in either elections or
106/// durability decisions.
107///
108/// # Example
109///
110/// ```ignore
111/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
112///
113/// let config = RepConfig::builder("my_group", "node1", "localhost")
114/// .node_port(5001)
115/// .build();
116/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
117/// ```
118/// Outcome of [`ReplicatedEnvironment::syncup_with_feeder`] — the action taken
119/// by a live diverged-tail syncup. Port of the branch in JE
120/// `ReplicaFeederSyncup.execute` between a soft rollback and a network restore.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum SyncupAction {
123 /// The divergent tail was rolled back to the matchpoint; resume streaming
124 /// from `start_vlsn` (`matchpoint + 1`). `matchpoint_vlsn == last VLSN`
125 /// means the replica was not diverged and nothing was truncated.
126 RolledBack { matchpoint_vlsn: u64, start_vlsn: u64 },
127 /// No safe rollback (no common matchpoint, or it would cross a committed
128 /// txn); the replica must do a full network restore.
129 NeedsRestore,
130}
131
132pub struct ReplicatedEnvironment {
133 /// The replication configuration for this node.
134 config: RepConfig,
135 /// Tracks the current node state (Detached, Unknown, Master, Replica).
136 node_state: NodeStateMachine,
137 /// Service for managing the replication group membership.
138 group_service: GroupService,
139 /// Maps VLSNs to log file positions.
140 ///
141 /// Wrapped in `Arc` so that background daemons (election driver,
142 /// VLSN-index persistence flusher) can share access without
143 /// borrowing the env. Closes finding F11 (
144 /// the 2026 review).
145 vlsn_index: Arc<VlsnIndex>,
146 /// Tracks acknowledgments from replicas (used by master).
147 ack_tracker: AckTracker,
148 /// Replication statistics.
149 stats: RepStats,
150 /// Active feeder threads (master -> replica streams).
151 feeders: RwLock<Vec<Feeder>>,
152 /// Replica stream for receiving updates from the master.
153 replica_stream: ReplicaStream,
154 /// Tracks the current master node.
155 master_tracker: MasterTracker,
156 /// State change listeners.
157 listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
158 /// Shutdown flag.
159 shutdown: AtomicBool,
160 /// Service dispatcher — listens on the replication port and routes
161 /// incoming connections to the appropriate service handler (feeder, etc.).
162 ///
163 /// `Plain`: plain TCP (default / Phase-2 behaviour).
164 /// `Tls`: TLS + mTLS enforcement (Phase 3, when `RepConfig::tls_config` is set
165 /// and `transport_kind` is `Tls`).
166 ///
167 /// `None` only when the bind address cannot be resolved.
168 tcp_dispatcher: Option<AnyServiceDispatcher>,
169 /// The address the `tcp_dispatcher` is actually bound to (may differ from
170 /// the configured port when port 0 is used in tests).
171 bound_addr: Option<SocketAddr>,
172
173 /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
174 ///
175 /// When set, `become_master` spawns a `FeederRunner` per replica using
176 /// `EnvironmentLogScanner`, and `become_replica` spawns a
177 /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
178 ///
179 /// In HA.
180 env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
181
182 /// Background I/O thread handles spawned during state transitions.
183 ///
184 /// Stored so that `close()` can join them cleanly. Each handle is
185 /// `Option` so we can `take()` it in `close()`.
186 io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
187
188 /// Shutdown flag shared with I/O threads so they terminate when the
189 /// environment is closed.
190 ///
191 /// Wrapped in an `Arc` so the replica receive thread (which connects to
192 /// an upstream feeder via `catch_up_from_peer`) can poll it directly and
193 /// break out of its blocking receive loop on close — otherwise a node
194 /// whose upstream stays connected (e.g. a mid-tier replica in a chain,
195 /// closed before its upstream) would never observe the close and
196 /// `close()`'s thread-join would hang.
197 io_shutdown: Arc<AtomicBool>,
198
199 /// Whether the RESTORE service has been registered on the TCP dispatcher.
200 ///
201 /// When `config.env_home` is `None` at construction time, registration is
202 /// deferred until `with_environment()` provides the env home path.
203 restore_registered: AtomicBool,
204
205 /// In-memory log queue used by the peer feeder service.
206 ///
207 /// When this node is a replica, `apply_entry()` pushes each received log
208 /// entry here. The `PeerFeederService` registered on the TCP dispatcher
209 /// reads from this queue to stream entries to downstream replicas that
210 /// are behind this node (peer-to-peer log distribution, HA style).
211 peer_scanner: Arc<PeerLogScanner>,
212
213 /// Durable Transaction VLSN (D7, JE RepNode.dtvlsn): the highest VLSN
214 /// known to have been replicated to a *majority* of the electable
215 /// replicas. On a master it is computed from feeder ack/heartbeat progress
216 /// (`update_dtvlsn_from_feeders`); on a replica it is set from commit/abort
217 /// records in the stream (`set_dtvlsn`). It advances monotonically (an
218 /// `update_max`). 0 = NULL_VLSN. Used by the election ranking (D2) so the
219 /// most-durable node, not merely the highest-raw-VLSN node, wins.
220 dtvlsn: std::sync::atomic::AtomicU64,
221
222 /// Shared acceptor state used by the ELECTION service handler.
223 /// The election driver updates `own_vlsn` / `own_term` here as the
224 /// node progresses; incoming acceptor sessions read it on every
225 /// connection so their replies always reflect the local node's
226 /// most recent state. Closes finding F6.
227 election_state: Arc<ElectionAcceptorState>,
228
229 /// Self-referential `Weak` populated once the env has been wrapped
230 /// in an `Arc`. Used by the replica I/O thread spawned in
231 /// `become_replica` so it can call `bootstrap_via_dispatcher` when
232 /// the master signals `NeedsRestore`.
233 ///
234 /// Populated lazily via [`Self::init_self_weak`] from `open()` and
235 /// the test harness. When unset (callers that build the env via
236 /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
237 /// the I/O thread falls back to operator-driven bootstrap.
238 self_weak: OnceLock<Weak<Self>>,
239
240 // -----------------------------------------------------------------------
241 // C-C2: active push-feeder infrastructure
242 // -----------------------------------------------------------------------
243 /// Per-replica channels injected via [`Self::register_feeder_channel`].
244 ///
245 /// When [`Self::become_master`] is called (or when the node is already
246 /// master), a [`FeederRunner`] thread is spawned for each registered
247 /// channel, actively streaming entries to that replica over the channel.
248 ///
249 /// Using `register_feeder_channel` is the primary integration point for
250 /// the push-based feeder path. Production deployments wire in a
251 /// `TcpChannel`; test code uses `LocalChannelPair`.
252 feeder_channels: StdMutex<HashMap<String, Arc<dyn crate::net::Channel>>>,
253
254 /// Per-replica dedicated entry queues backing the push-feeder path.
255 ///
256 /// Each `FeederRunner` thread reads exclusively from its replica's queue.
257 /// [`Self::replicate_entry`] and [`Self::apply_entry`] fan out into all
258 /// registered queues so the push runners receive entries without competing
259 /// with [`PeerFeederService`] for ownership of `peer_scanner`.
260 feeder_queues: std::sync::RwLock<HashMap<String, Arc<PeerLogScanner>>>,
261
262 /// Active `FeederRunner` references for acked-VLSN queries and
263 /// clean shutdown (M-4: wait for replicas to catch up).
264 active_feeder_runners: StdMutex<HashMap<String, Arc<FeederRunner>>>,
265
266 /// Monotone VLSN counter shared with the wired `EnvironmentImpl`.
267 ///
268 /// Installed into the environment via
269 /// `EnvironmentImpl::set_replication_vlsn_counter()` when
270 /// `with_environment` is called. Each `log_txn_commit` on the master
271 /// atomically increments this counter and writes a VLSN-tagged WAL entry,
272 /// which `EnvironmentLogScanner` then picks up without any
273 /// `replicate_entry` call from the application.
274 wal_vlsn_counter: Arc<std::sync::atomic::AtomicU64>,
275
276 /// Count of downstream connections this node has served via the JE
277 /// `Feeder`/`MasterFeederSource` mechanism (`FeederRunner +
278 /// EnvironmentLogScanner` reading this node's WAL). Shared with the
279 /// node's [`crate::stream::peer_feeder::PeerFeederService`] when a WAL
280 /// source is registered (master in `become_master`, or a cascading
281 /// replica in `become_replica`). A non-zero value PROVES this node fed
282 /// a downstream by the SAME mechanism the master uses — the cascade does
283 /// not diverge. See [`Self::wal_feeds_served`].
284 wal_feeds_served: Arc<std::sync::atomic::AtomicU64>,
285
286 /// REP-10 (C): the replica-side consistency tracker, built from the
287 /// REP-7 `last_applied_vlsn` handle when the replica replay thread starts
288 /// (`become_replica`). `None` on a master or before replay is wired.
289 ///
290 /// A read that begins on a replica with a non-`NoConsistency` policy waits
291 /// on this tracker (`begin_read_consistency`). Port of
292 /// `RepImpl.getConsistency` / `Replica.getConsistencyTracker`.
293 consistency_tracker: StdMutex<Option<crate::ConsistencyTracker>>,
294}
295
296impl ReplicatedEnvironment {
297 /// Create a new replicated environment.
298 ///
299 ///
300 ///
301 /// Creates a replicated environment handle and starts participating in the
302 /// replication group. The node's state is determined when it joins the
303 /// group, and mastership is not preconfigured. If the group has no current
304 /// master, creation will trigger an election to determine whether this node
305 /// will participate as a Master or a Replica.
306 ///
307 /// A brand new node will always join an existing group as a Replica, unless
308 /// it is the very first electable node that is creating the group. In that
309 /// case it joins as the Master of the newly formed singleton group.
310 pub fn new(config: RepConfig) -> Result<Self> {
311 // mTLS Phase 2 (v3.1.0): peer_allowlist enforcement is real at the
312 // TLS channel layer (TlsTcpChannelListener::bind_with_tls_and_allowlist).
313 // Phase 3 (this release): when RepConfig::tls_config is set AND
314 // transport_kind is Tls, the service dispatcher itself enforces mTLS
315 // via TlsTcpServiceDispatcher. For the remaining cases (no TlsConfig
316 // or non-TLS transport) keep the Phase-2 accurate warn.
317 if !config.peer_allowlist.is_empty() {
318 match config.transport_kind {
319 crate::rep_config::RepTransportKind::Tls => {
320 if config.tls_config.is_some() {
321 log::info!(
322 "[{}] peer_allowlist ({} entries) + tls_config set; \
323 mTLS will be enforced on the service dispatcher.",
324 config.node_name,
325 config.peer_allowlist.len(),
326 );
327 } else {
328 log::info!(
329 "[{}] peer_allowlist configured ({} entries) but \
330 tls_config is None — the service dispatcher will \
331 use plain TCP. Set RepConfig::tls_config to \
332 activate end-to-end mTLS on this path.",
333 config.node_name,
334 config.peer_allowlist.len(),
335 );
336 }
337 }
338 _ => {
339 log::warn!(
340 "[{}] peer_allowlist is configured ({} entries) but \
341 transport_kind is not Tls — the allowlist has no \
342 effect without TLS transport. Set \
343 RepTransportKind::Tls to activate mTLS enforcement.",
344 config.node_name,
345 config.peer_allowlist.len(),
346 );
347 }
348 }
349 }
350 let node_state = NodeStateMachine::new();
351 let group_service = GroupService::new(config.group_name.clone());
352 let vlsn_index = {
353 // F11: try to load a previously persisted vlsn.idx from
354 // env_home if one exists. A successfully loaded index lets a
355 // restarted replica resume from where it left off without a
356 // full network restore; a missing or corrupt file falls back
357 // to a fresh in-memory index (caller will need to bootstrap).
358 if let Some(ref home) = config.env_home {
359 match crate::vlsn::persist::load_from_disk(home) {
360 Ok(Some(idx)) => {
361 log::info!(
362 "Node '{}' loaded persisted VLSN index from {} \
363 ({} entries, latest vlsn={})",
364 config.node_name,
365 home.display(),
366 idx.snapshot_entries().len(),
367 idx.get_latest_vlsn(),
368 );
369 Arc::new(idx)
370 }
371 Ok(None) => Arc::new(VlsnIndex::new(10)),
372 Err(e) => {
373 log::warn!(
374 "Node '{}' failed to load persisted VLSN index \
375 from {}: {} (treating as fresh node — network \
376 restore required)",
377 config.node_name,
378 home.display(),
379 e
380 );
381 // Best-effort: remove the corrupt file so the
382 // next persist cycle writes a clean one. A
383 // missing file is the "fresh node" baseline.
384 let _ = std::fs::remove_file(
385 crate::vlsn::persist::index_path(home),
386 );
387 Arc::new(VlsnIndex::new(10))
388 }
389 }
390 } else {
391 Arc::new(VlsnIndex::new(10))
392 }
393 };
394 let ack_tracker = AckTracker::new();
395 let stats = RepStats::new();
396 let feeders = RwLock::new(Vec::new());
397 let replica_stream = ReplicaStream::new();
398 let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
399
400 // Start the service dispatcher.
401 //
402 // Phase 3: when RepConfig::tls_config is set AND transport_kind is Tls,
403 // start a TlsTcpServiceDispatcher (mTLS enforced). Otherwise fall back
404 // to the plain-TCP TcpServiceDispatcher.
405 let listen_addr_str =
406 format!("{}:{}", config.node_host, config.node_port);
407 let mut restore_registered_init = false;
408
409 // Returns (AnyServiceDispatcher, bound_addr) or (None, None) on error.
410 let (tcp_dispatcher, bound_addr) = match listen_addr_str
411 .parse::<SocketAddr>()
412 {
413 Ok(addr) => {
414 let build_result: Result<(AnyServiceDispatcher, SocketAddr)> =
415 Self::build_dispatcher(&config, addr);
416 match build_result {
417 Ok((dispatcher, bound)) => {
418 // Register the network restore handler.
419 if let Some(ref home) = config.env_home {
420 let restore_server =
421 NetworkRestoreServer::new(home.clone());
422 dispatcher.register(
423 RESTORE_SERVICE_NAME,
424 Arc::new(restore_server),
425 );
426 log::debug!(
427 "Node '{}' RESTORE service registered \
428 (env_home={})",
429 config.node_name,
430 home.display(),
431 );
432 restore_registered_init = true;
433 }
434 let kind =
435 if dispatcher.is_tls() { "TLS" } else { "TCP" };
436 log::info!(
437 "Node '{}' {} service dispatcher started on {}",
438 config.node_name,
439 kind,
440 bound
441 );
442 (Some(dispatcher), Some(bound))
443 }
444 Err(e) => {
445 log::warn!(
446 "Node '{}' failed to start service dispatcher \
447 on {}: {}",
448 config.node_name,
449 listen_addr_str,
450 e
451 );
452 (None, None)
453 }
454 }
455 }
456 Err(e) => {
457 log::warn!(
458 "Node '{}' cannot parse listen address '{}': {}",
459 config.node_name,
460 listen_addr_str,
461 e
462 );
463 (None, None)
464 }
465 };
466
467 // Build the in-memory peer log scanner; register the peer feeder
468 // service on the dispatcher so downstream replicas can connect.
469 let peer_scanner = Arc::new(PeerLogScanner::new());
470 // F5/F31: build the acceptor state with persistence enabled when
471 // env_home is configured. Crash-durable promises are required
472 // for the Paxos safety invariant after a process restart.
473 let election_state =
474 Arc::new(if let Some(ref home) = config.env_home {
475 ElectionAcceptorState::with_env_home(
476 config.node_name.clone(),
477 1,
478 home,
479 )
480 } else {
481 ElectionAcceptorState::new(config.node_name.clone(), 1)
482 });
483 if let Some(ref dispatcher) = tcp_dispatcher {
484 let service = PeerFeederService::new(Arc::clone(&peer_scanner));
485 dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
486 log::debug!(
487 "Node '{}' PEER_FEEDER service registered",
488 config.node_name,
489 );
490 // F6: register the ELECTION service so peers can run
491 // run_acceptor against this node when proposing.
492 let election_svc =
493 Arc::new(ElectionService::new(Arc::clone(&election_state)));
494 dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
495 log::debug!(
496 "Node '{}' ELECTION service registered",
497 config.node_name,
498 );
499 }
500
501 let env = Self {
502 config,
503 node_state,
504 group_service,
505 vlsn_index,
506 ack_tracker,
507 stats,
508 feeders,
509 replica_stream,
510 master_tracker,
511 listeners: RwLock::new(Vec::new()),
512 shutdown: AtomicBool::new(false),
513 tcp_dispatcher,
514 bound_addr,
515 env_impl: StdMutex::new(None),
516 io_threads: StdMutex::new(Vec::new()),
517 io_shutdown: Arc::new(AtomicBool::new(false)),
518 restore_registered: AtomicBool::new(restore_registered_init),
519 peer_scanner,
520 dtvlsn: std::sync::atomic::AtomicU64::new(0),
521 election_state,
522 self_weak: OnceLock::new(),
523 feeder_channels: StdMutex::new(HashMap::new()),
524 feeder_queues: std::sync::RwLock::new(HashMap::new()),
525 active_feeder_runners: StdMutex::new(HashMap::new()),
526 wal_vlsn_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
527 wal_feeds_served: Arc::new(std::sync::atomic::AtomicU64::new(0)),
528 consistency_tracker: StdMutex::new(None),
529 };
530
531 Ok(env)
532 }
533
534 /// Open a replicated environment with the standard production
535 /// lifecycle.
536 ///
537 /// This is the entry point recommended by the mdBook chapters:
538 /// it allocates the `ReplicatedEnvironment`, registers all
539 /// services on the TCP dispatcher, and spawns the **election
540 /// driver** background thread that runs Paxos rounds against
541 /// known peers until the node has resolved into either Master or
542 /// Replica state.
543 ///
544 /// Closes finding F6 of the 2026 review.
545 ///
546 /// Use [`ReplicatedEnvironment::new`] directly only when the
547 /// caller plans to drive state transitions explicitly (test
548 /// harnesses, scripted bootstrap, recovery tooling).
549 pub fn open(config: RepConfig) -> Result<Arc<Self>> {
550 let env = Arc::new(Self::new(config)?);
551 env.init_self_weak();
552 env.start_election_driver();
553 env.start_vlsn_persistence_daemon();
554 env.register_admin_service();
555 Ok(env)
556 }
557
558 /// Build the service dispatcher for this node.
559 ///
560 /// Phase 3 logic: when `config.transport_kind == Tls` AND
561 /// `config.tls_config` is `Some`, start a
562 /// [`crate::net::service_dispatcher::TlsTcpServiceDispatcher`] that
563 /// enforces mTLS with the configured `peer_allowlist`. Otherwise
564 /// start the plain-TCP [`TcpServiceDispatcher`].
565 ///
566 /// Returns `(dispatcher, bound_addr)` or a `RepError` on bind / TLS
567 /// config failure.
568 fn build_dispatcher(
569 #[cfg_attr(not(feature = "tls-rustls"), allow(unused_variables))]
570 config: &RepConfig,
571 addr: SocketAddr,
572 ) -> Result<(AnyServiceDispatcher, SocketAddr)> {
573 #[cfg(feature = "tls-rustls")]
574 if config.transport_kind == crate::rep_config::RepTransportKind::Tls {
575 use crate::auth::PeerAllowlist;
576 use crate::net::service_dispatcher::TlsTcpServiceDispatcher;
577 let tls = config.tls_config.as_ref().ok_or_else(|| {
578 RepError::ConfigError(format!(
579 "node '{}': transport_kind=Tls requires a tls_config",
580 config.node_name,
581 ))
582 })?;
583 let allowlist =
584 PeerAllowlist::new(config.peer_allowlist.iter().cloned());
585 // Fail-closed: an empty allowlist with TLS transport is a
586 // misconfiguration. The same policy is enforced at the TLS
587 // listener and QUIC constructors; downgrading to plain TCP here
588 // would be a silent security regression for a node that asked
589 // for TLS.
590 if allowlist.is_empty() {
591 return Err(RepError::ConfigError(format!(
592 "node '{}': transport_kind=Tls requires a non-empty \
593 peer_allowlist (mTLS enforcement is fail-closed)",
594 config.node_name,
595 )));
596 }
597 let disp = TlsTcpServiceDispatcher::new(addr, tls, allowlist)?;
598 let bound = disp.start()?;
599 return Ok((AnyServiceDispatcher::Tls(disp), bound));
600 }
601 // Plain-TCP dispatcher (default or when TLS config is missing).
602 let disp = TcpServiceDispatcher::new(addr).map_err(|e| {
603 RepError::NetworkError(format!("TCP dispatcher init: {e}"))
604 })?;
605 let bound = disp.start()?;
606 Ok((AnyServiceDispatcher::Plain(disp), bound))
607 }
608
609 /// Populate the env's self-referential `Weak` so background
610 /// threads can obtain a back-reference for auto-orchestrated
611 /// follow-up actions (e.g. replica auto-bootstrap on
612 /// `NeedsRestore`). Idempotent: subsequent calls are silent
613 /// no-ops because the inner [`OnceLock`] only accepts one set.
614 ///
615 /// Callers that wrap the env in `Arc` and want auto-bootstrap
616 /// behaviour should call this immediately after construction.
617 /// `Self::open` already does so. Test harnesses that drive
618 /// transitions manually (`RepTestBase`) also call this so the
619 /// auto-bootstrap path is exercised in tests.
620 pub fn init_self_weak(self: &Arc<Self>) {
621 let _ = self.self_weak.set(Arc::downgrade(self));
622 }
623
624 /// Register the `ADMIN` service handler on the TCP dispatcher.
625 ///
626 /// Closes findings F7 / F8. Holds a `Weak<Self>` so the handler
627 /// does not extend the env's lifetime. Idempotent: re-registering
628 /// is harmless because `TcpServiceDispatcher::register` overwrites
629 /// the existing handler.
630 pub fn register_admin_service(self: &Arc<Self>) {
631 if let Some(ref dispatcher) = self.tcp_dispatcher {
632 crate::group_admin::register_admin_service(
633 dispatcher,
634 Arc::downgrade(self),
635 );
636 log::debug!(
637 "Node '{}' ADMIN service registered",
638 self.config.node_name,
639 );
640 }
641 }
642
643 /// Spawn the VLSN-index persistence daemon (F11).
644 ///
645 /// Periodically (every 2 seconds) snapshots the in-memory
646 /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
647 /// resume from where the replica left off without a full network
648 /// restore. No-op when `config.env_home` is `None`.
649 ///
650 /// Idempotent: only one daemon is ever spawned per env.
651 pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
652 let Some(home) = self.config.env_home.clone() else {
653 return;
654 };
655 {
656 let threads = self.io_threads.lock().unwrap();
657 if threads.iter().any(|h| {
658 h.thread()
659 .name()
660 .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
661 }) {
662 return;
663 }
664 }
665
666 let vlsn_index = Arc::clone(&self.vlsn_index);
667 let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
668 let me = Arc::clone(self);
669 let interval = Duration::from_secs(2);
670
671 let handle = std::thread::Builder::new()
672 .name(name)
673 .spawn(move || {
674 use std::sync::atomic::Ordering;
675 let mut last_persisted_vlsn: u64 = 0;
676 while !me.io_shutdown.load(Ordering::SeqCst)
677 && !me.is_shutdown()
678 {
679 std::thread::sleep(interval);
680 if me.io_shutdown.load(Ordering::SeqCst) {
681 break;
682 }
683 let latest = vlsn_index.get_latest_vlsn();
684 if latest == last_persisted_vlsn {
685 // Nothing new to flush.
686 continue;
687 }
688 // X-2: cap the flush at the last durable checkpoint's
689 // end LSN so the persisted VLSN index never claims
690 // VLSNs beyond the durable B-tree state. After a crash
691 // the recovered tree and the index will be coherent.
692 let cap_lsn = me
693 .env_impl
694 .lock()
695 .unwrap()
696 .as_ref()
697 .and_then(|e| e.get_checkpointer())
698 .map(|c| c.get_last_checkpoint_end())
699 .unwrap_or(noxu_util::NULL_LSN);
700 match crate::vlsn::persist::flush_to_disk_capped(
701 &vlsn_index,
702 &home,
703 cap_lsn,
704 ) {
705 Ok(n) => {
706 log::trace!(
707 "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
708 n,
709 latest,
710 cap_lsn,
711 );
712 last_persisted_vlsn = latest;
713 }
714 Err(e) => {
715 log::warn!(
716 "vlsn-flush: failed to persist VLSN index to {}: {}",
717 home.display(),
718 e
719 );
720 }
721 }
722 }
723 // Final flush on shutdown so a clean close is recoverable.
724 // Cap at the last checkpoint even for the shutdown flush.
725 let cap_lsn = me
726 .env_impl
727 .lock()
728 .unwrap()
729 .as_ref()
730 .and_then(|e| e.get_checkpointer())
731 .map(|c| c.get_last_checkpoint_end())
732 .unwrap_or(noxu_util::NULL_LSN);
733 if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
734 &vlsn_index,
735 &home,
736 cap_lsn,
737 ) {
738 log::warn!(
739 "vlsn-flush (final): failed to persist VLSN index: {}",
740 e
741 );
742 }
743 })
744 .expect("failed to spawn noxu-vlsn-flush thread");
745
746 self.io_threads.lock().unwrap().push(handle);
747 log::debug!(
748 "Node '{}' VLSN persistence daemon started",
749 self.config.node_name,
750 );
751 }
752
753 /// Spawn the election driver background thread.
754 ///
755 /// While the env is in `Detached` or `Unknown` state and no master
756 /// is known, the driver periodically attempts a Paxos election
757 /// against peers in `GroupService` (whose ELECTION services were
758 /// registered at `open()` time). On success the driver calls
759 /// `become_master` (if this node is the winner) or `become_replica`
760 /// (otherwise). On failure (no quorum), the driver waits
761 /// `config.election_timeout` and tries again.
762 ///
763 /// The driver respects `io_shutdown`; on env close the loop exits
764 /// promptly.
765 ///
766 /// Idempotent: a second call is a no-op (only one driver thread is
767 /// ever spawned per env).
768 pub fn start_election_driver(self: &Arc<Self>) {
769 use std::sync::atomic::Ordering;
770 // Reuse io_shutdown for cancellation; a successful spawn is
771 // recorded by appending to io_threads, so a duplicate call
772 // would re-add a thread — we use a one-shot `AtomicBool`
773 // sentinel placed in the io_shutdown's slot via a new field.
774 // Cheaper: a static name check on io_threads is impossible;
775 // instead, gate spawning on whether any io_thread already
776 // carries the driver name.
777 {
778 let threads = self.io_threads.lock().unwrap();
779 if threads.iter().any(|h| {
780 h.thread()
781 .name()
782 .is_some_and(|n| n.starts_with("noxu-election-"))
783 }) {
784 return;
785 }
786 }
787
788 let me = Arc::clone(self);
789 let name = format!("noxu-election-{}", self.config.node_name);
790 let handle = std::thread::Builder::new()
791 .name(name)
792 .spawn(move || {
793 me.run_election_loop();
794 })
795 .expect("failed to spawn election driver thread");
796 self.io_threads.lock().unwrap().push(handle);
797 log::debug!("Node '{}' election driver started", self.config.node_name,);
798 // Keep ordering sane on the io_shutdown flag.
799 let _ = self.io_shutdown.load(Ordering::SeqCst);
800 }
801
802 /// Body of the election driver loop. Public only for tests; called
803 /// by [`Self::start_election_driver`].
804 fn run_election_loop(self: Arc<Self>) {
805 use std::sync::atomic::Ordering;
806 // Maintain an internal monotonically increasing election term.
807 // Each successful or failed round bumps the term so retries do
808 // not collide with stale acceptor promises.
809 let mut term: u64 = 1;
810
811 loop {
812 if self.io_shutdown.load(Ordering::SeqCst) {
813 return;
814 }
815 if self.is_shutdown() {
816 return;
817 }
818
819 let state = self.node_state.get_state();
820 // Stop driving once we've resolved into Master/Replica;
821 // re-arm only if the node returns to Unknown.
822 if matches!(state, NodeState::Master | NodeState::Replica) {
823 std::thread::sleep(Duration::from_millis(200));
824 continue;
825 }
826 if matches!(state, NodeState::Shutdown) {
827 return;
828 }
829
830 // Probe peers for an active master via the existing
831 // GroupService cache. In the absence of a heartbeat path
832 // we rely on master_tracker (set by become_replica from
833 // the receive loop).
834 if let Some(master_name) = self.master_tracker.get_master()
835 && master_name != self.config.node_name
836 {
837 let _ = self.become_replica(&master_name);
838 continue;
839 }
840
841 // Snapshot peers to dial for ELECTION.
842 let peers: Vec<(String, SocketAddr)> = self
843 .group_service
844 .get_all_nodes()
845 .into_iter()
846 .filter(|n| n.name != self.config.node_name)
847 .filter_map(|n| {
848 format!("{}:{}", n.host, n.port)
849 .parse::<SocketAddr>()
850 .ok()
851 .map(|a| (n.name, a))
852 })
853 .collect();
854
855 // Build the local rep group view used by run_election to
856 // compute quorum and resolve the winner name. Include
857 // self.
858 let group = self.local_rep_group_with_self();
859
860 // Update election state for any concurrent acceptor calls.
861 let our_vlsn = self.vlsn_index.get_latest_vlsn();
862 self.election_state.set_vlsn(our_vlsn);
863 self.election_state.set_term(term);
864 // D2: advertise our DTVLSN as the major election-ranking key.
865 self.election_state.set_dtvlsn(self.get_dtvlsn());
866
867 // Connect to each peer's ELECTION service. Failures are
868 // tolerated: a peer that doesn't answer simply contributes
869 // no vote. The election may still reach quorum in the
870 // remaining peers.
871 let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
872 Vec::new();
873 for (peer_name, addr) in &peers {
874 match crate::net::service_dispatcher::connect_to_service(
875 *addr,
876 ELECTION_SERVICE_NAME,
877 ) {
878 Ok(ch) => {
879 let arc: Arc<dyn crate::net::channel::Channel> =
880 Arc::new(ch);
881 channels.push(arc);
882 }
883 Err(e) => {
884 log::trace!(
885 "election driver: peer {} ({}) unreachable: {}",
886 peer_name,
887 addr,
888 e
889 );
890 }
891 }
892 }
893
894 // Resolve our own node_id from the group; if not present
895 // we cannot run an election (closed-world guard — see F22).
896 let self_node_id =
897 group.get_node(&self.config.node_name).map(|n| n.node_id());
898 let self_node_id = match self_node_id {
899 Some(id) => id,
900 None => {
901 log::warn!(
902 "election driver: node '{}' not registered in \
903 own group view; sleeping",
904 self.config.node_name
905 );
906 std::thread::sleep(Duration::from_millis(200));
907 continue;
908 }
909 };
910
911 log::debug!(
912 "election driver on '{}': starting term={} with {} peers",
913 self.config.node_name,
914 term,
915 channels.len(),
916 );
917 let outcome = crate::elections::paxos::run_election_with_phi_dtvlsn(
918 self_node_id,
919 &self.config.node_name,
920 &group,
921 &channels,
922 our_vlsn,
923 /* priority */ 1,
924 term,
925 /* own_dtvlsn (D2 major ranking key) */
926 self.get_dtvlsn(),
927 None,
928 std::time::Duration::from_millis(500),
929 );
930
931 match outcome {
932 Some(winner_id) if winner_id == self_node_id => {
933 if let Err(e) = self.become_master(term) {
934 log::warn!(
935 "election driver: become_master failed: {}",
936 e
937 );
938 } else {
939 log::info!(
940 "election driver: '{}' became master at term {}",
941 self.config.node_name,
942 term,
943 );
944 }
945 }
946 Some(winner_id) => {
947 if let Some(winner_node) = group
948 .get_nodes()
949 .into_iter()
950 .find(|n| n.node_id() == winner_id)
951 {
952 if let Err(e) = self.become_replica(&winner_node.name) {
953 log::warn!(
954 "election driver: become_replica failed: {}",
955 e
956 );
957 } else {
958 log::info!(
959 "election driver: '{}' became replica of '{}' at term {}",
960 self.config.node_name,
961 winner_node.name,
962 term,
963 );
964 }
965 }
966 }
967 None => {
968 log::debug!(
969 "election driver on '{}' term={}: no quorum",
970 self.config.node_name,
971 term,
972 );
973 }
974 }
975
976 term = term.saturating_add(1);
977 // Back off so we don't pin the loop on transient failures.
978 std::thread::sleep(
979 self.config.election_timeout.min(Duration::from_millis(500)),
980 );
981 }
982 }
983
984 /// Internal: a `RepGroup` snapshot that includes self.
985 fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
986 let mut group = self.get_rep_group();
987 // Ensure self is present in the group view; the
988 // group_service does not auto-register the local node.
989 if group.get_node(&self.config.node_name).is_none() {
990 let mut self_node = crate::rep_node::RepNode::new(
991 self.config.node_name.clone(),
992 self.config.node_type,
993 self.config.node_host.clone(),
994 self.config.node_port,
995 /* node_id */ 0,
996 );
997 // Stable self node_id derived from the name hash so
998 // re-creations in the same process don't collide.
999 use std::hash::{Hash, Hasher};
1000 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1001 self.config.node_name.hash(&mut hasher);
1002 // Restrict to a u32 range and avoid 0 (reserved for
1003 // "unknown").
1004 let id = ((hasher.finish() as u32) | 1).max(1);
1005 self_node.node_id = id;
1006 group.add_node(self_node);
1007 }
1008 group
1009 }
1010
1011 /// Return the socket address the TCP service dispatcher is bound to.
1012 ///
1013 /// This may differ from the configured `node_port` when port 0 is used
1014 /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
1015 /// could not be started (e.g. the address is not resolvable).
1016 pub fn bound_addr(&self) -> Option<SocketAddr> {
1017 self.bound_addr
1018 }
1019
1020 /// Wire a live `EnvironmentImpl` into this replicated environment.
1021 ///
1022 /// After this call, state transitions (`become_master`, `become_replica`)
1023 /// will spawn real feeder/receiver I/O threads backed by the live log.
1024 ///
1025 /// If the RESTORE service was not registered at construction time (because
1026 /// `config.env_home` was `None`), it is registered here using the
1027 /// environment's actual home path. This mirrors`RepNode.envSetup()`
1028 /// which registers the restore handler during environment wiring.
1029 ///
1030 /// Environment reference wiring.
1031 /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
1032 pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
1033 // Register RESTORE service lazily if not already done.
1034 if !self.restore_registered.load(Ordering::SeqCst)
1035 && let Some(ref dispatcher) = self.tcp_dispatcher
1036 {
1037 let env_home = env.get_env_home().to_path_buf();
1038 let restore_server = NetworkRestoreServer::new(env_home.clone());
1039 dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
1040 self.restore_registered.store(true, Ordering::SeqCst);
1041 log::debug!(
1042 "Node '{}' RESTORE service registered via with_environment \
1043 (env_home={})",
1044 self.config.node_name,
1045 env_home.display(),
1046 );
1047 }
1048
1049 // X-14: rebuild the VLSN index from recovery-replayed LN entries.
1050 // After a crash the on-disk vlsn.idx may be stale (either ahead of
1051 // the recovered B-tree, or behind if vlsn.idx was not flushed
1052 // after the last checkpoint). Re-registering all (vlsn, lsn) pairs
1053 // from the redo pass gives a consistent in-memory index.
1054 if !env.recovery_vlsns.is_empty() {
1055 log::info!(
1056 "Node '{}': rebuilding VLSN index from {} recovered entries",
1057 self.config.node_name,
1058 env.recovery_vlsns.len(),
1059 );
1060 for &(vlsn, lsn_u64) in &env.recovery_vlsns {
1061 let lsn = noxu_util::Lsn::from_u64(lsn_u64);
1062 self.vlsn_index.register(
1063 vlsn,
1064 lsn.file_number(),
1065 lsn.file_offset(),
1066 );
1067 }
1068 }
1069
1070 // X-1: truncate the VLSN index to the rollback matchpoint if recovery
1071 // detected a completed rollback period. The matchpoint is the highest
1072 // LSN that is still valid after the rollback; entries with higher VLSNs
1073 // correspond to data that was rolled back and must not appear in the
1074 // index.
1075 if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
1076 // Find the latest VLSN whose LSN is at or before the matchpoint.
1077 // Scan the recovered VLSN pairs (sorted ascending) to find the
1078 // boundary.
1079 let safe_vlsn = env
1080 .recovery_vlsns
1081 .iter()
1082 .rev()
1083 .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
1084 .map(|&(vlsn, _)| vlsn)
1085 .unwrap_or(0);
1086 log::info!(
1087 "Node '{}': truncating VLSN index after vlsn={} \
1088 (rollback matchpoint lsn={:#x})",
1089 self.config.node_name,
1090 safe_vlsn,
1091 matchpoint_lsn_u64,
1092 );
1093 self.vlsn_index.truncate_after(safe_vlsn);
1094 }
1095
1096 *self.env_impl.lock().unwrap() = Some(Arc::clone(&env));
1097
1098 // C-C2b: install the VLSN counter so log_txn_commit writes
1099 // VLSN-tagged headers. When become_master then spawns an
1100 // EnvironmentLogScanner-backed FeederRunner, it will find these
1101 // entries and auto-feed them to replicas without any
1102 // replicate_entry call from the application.
1103 env.set_replication_vlsn_counter(Arc::clone(&self.wal_vlsn_counter));
1104 }
1105
1106 /// Get the current node state.
1107 ///
1108 ///
1109 ///
1110 /// Returns the current state of the node associated with this replication
1111 /// environment. If the caller's intent is to track the state of the node,
1112 /// `StateChangeListener` may be a more convenient and efficient approach.
1113 pub fn get_state(&self) -> NodeState {
1114 self.node_state.get_state()
1115 }
1116
1117 /// Check if this node is the master.
1118 ///
1119 /// Returns true if the node's current state is Master.
1120 pub fn is_master(&self) -> bool {
1121 self.node_state.get_state() == NodeState::Master
1122 }
1123
1124 /// Returns true if this node is an *authoritative* master (D4, JE
1125 /// `ElectionQuorum.isAuthoritativeMaster`): it is the group master AND it
1126 /// is still connected to enough replicas that, including itself, a
1127 /// SIMPLE_MAJORITY quorum is present.
1128 ///
1129 /// A master on the minority side of a network partition is NOT
1130 /// authoritative — it must not claim the special election ranking
1131 /// (`MASTER_RANKING`) nor (eventually) continue accepting writes, so the
1132 /// majority side can elect a fresh master without it competing
1133 /// (split-brain prevention).
1134 ///
1135 /// "Active replica count" = the number of currently-connected push-feeder
1136 /// runners serving *electable* peers (Monitors/Secondaries do not count
1137 /// toward the election quorum). `+ 1` for this master itself.
1138 pub fn is_authoritative_master(&self) -> bool {
1139 if !self.is_master() {
1140 return false;
1141 }
1142 let group = self.get_rep_group();
1143 // Total electable nodes (incl. self) — peers + this master.
1144 let electable_total: usize = group
1145 .get_nodes()
1146 .iter()
1147 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
1148 .count()
1149 + 1; // +1 for self/master (not registered as a peer)
1150
1151 // Active replicas = connected feeder runners whose peer is electable.
1152 let active_electable_replicas: usize = {
1153 let runners = self.active_feeder_runners.lock().unwrap();
1154 runners
1155 .keys()
1156 .filter(|name| {
1157 group
1158 .get_node(name)
1159 .map(|n| {
1160 n.node_type == crate::node_type::NodeType::Electable
1161 })
1162 .unwrap_or(false)
1163 })
1164 .count()
1165 };
1166 Self::authoritative_quorum_met(
1167 active_electable_replicas,
1168 electable_total,
1169 )
1170 }
1171
1172 /// Pure SIMPLE_MAJORITY quorum check for `is_authoritative_master` (JE
1173 /// `ElectionQuorum.isAuthoritativeMaster`): `(activeReplicas + 1) >=
1174 /// quorumSize` where `quorumSize = electableTotal / 2 + 1`.
1175 fn authoritative_quorum_met(
1176 active_electable_replicas: usize,
1177 electable_total: usize,
1178 ) -> bool {
1179 let quorum_size = electable_total / 2 + 1;
1180 (active_electable_replicas + 1) >= quorum_size
1181 }
1182
1183 /// Check if this node is a replica.
1184 ///
1185 /// Returns true if the node's current state is Replica.
1186 pub fn is_replica(&self) -> bool {
1187 self.node_state.get_state() == NodeState::Replica
1188 }
1189
1190 /// Returns true if the node is currently participating in the group
1191 /// as a Replica or a Master.
1192 pub fn is_active(&self) -> bool {
1193 self.node_state.get_state().is_active()
1194 }
1195
1196 /// Get the node name.
1197 ///
1198 ///
1199 ///
1200 /// Returns the unique name used to identify this replicated environment.
1201 pub fn get_node_name(&self) -> &str {
1202 self.config.node_name.as_str()
1203 }
1204
1205 /// Get the group name.
1206 ///
1207 /// Returns the name of the replication group this node belongs to.
1208 pub fn get_group_name(&self) -> &str {
1209 self.config.group_name.as_str()
1210 }
1211
1212 /// Get the current master (if known).
1213 ///
1214 /// Returns the name of the node that is currently the master, or None
1215 /// if the master is not known (e.g. the node is in the Unknown or
1216 /// Detached state).
1217 pub fn get_master_name(&self) -> Option<String> {
1218 self.master_tracker.get_master()
1219 }
1220
1221 /// Get the replication group info.
1222 ///
1223 ///
1224 ///
1225 /// Returns a description of the replication group as known by this node.
1226 /// The replicated group metadata is stored in a replicated database and
1227 /// updates are propagated by the current master node to all replicas. If
1228 /// this node is not the master, it is possible for its description of the
1229 /// group to be out of date.
1230 pub fn get_group(&self) -> &GroupService {
1231 &self.group_service
1232 }
1233
1234 /// Add a peer node to the replication group at runtime.
1235 ///
1236 /// The node is registered in the `GroupService` so elections and quorum
1237 /// calculations immediately reflect the new membership.
1238 pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
1239 use crate::group_service::NodeInfo;
1240 use std::time::Instant;
1241
1242 let info = NodeInfo {
1243 name: node.name.clone(),
1244 node_type: node.node_type,
1245 host: node.host.clone(),
1246 port: node.port,
1247 node_id: node.node_id,
1248 joined_at: Instant::now(),
1249 last_seen: Instant::now(),
1250 is_active: true,
1251 known_vlsn: 0,
1252 log_range: None,
1253 read_capacity_pct: node.read_capacity_pct,
1254 write_capacity_pct: node.write_capacity_pct,
1255 latency_hint_ms: node.latency_hint_ms,
1256 };
1257 self.group_service.add_node(info)?;
1258 log::info!(
1259 "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1260 self.config.node_name,
1261 node.name,
1262 node.host,
1263 node.port,
1264 self.config.group_name,
1265 );
1266
1267 // F9: if we are the current master, immediately register a
1268 // `Feeder` tracker for the new peer so AckTracker bookkeeping
1269 // and downstream pull-based streaming work without a forced
1270 // re-election.
1271 if self.is_master()
1272 && (node.node_type == crate::node_type::NodeType::Electable
1273 || node.node_type == crate::node_type::NodeType::Secondary)
1274 {
1275 let mut feeders = self.feeders.write();
1276 if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1277 feeders.push(Feeder::new(node.name.clone()));
1278 log::debug!(
1279 "Node '{}' (master): dispatched Feeder for new peer '{}'",
1280 self.config.node_name,
1281 node.name,
1282 );
1283 }
1284 }
1285 Ok(())
1286 }
1287
1288 /// Remove a peer node from the replication group by name.
1289 ///
1290 /// The node is deregistered from the `GroupService`. Elections initiated
1291 /// after this call will not include the removed node in quorum calculations.
1292 pub fn remove_peer(&self, name: &str) -> Result<()> {
1293 self.group_service.remove_node(name)?;
1294 log::info!(
1295 "Node '{}': removed peer '{}' from group '{}'",
1296 self.config.node_name,
1297 name,
1298 self.config.group_name,
1299 );
1300 Ok(())
1301 }
1302
1303 /// Update the capacity and latency metadata of an existing peer.
1304 ///
1305 /// Only the following fields are updated from `node`:
1306 /// - `read_capacity_pct`
1307 /// - `write_capacity_pct`
1308 /// - `latency_hint_ms`
1309 ///
1310 /// The node's identity (name, address, port, node_type) is preserved.
1311 /// Safe to call while replication is active.
1312 ///
1313 /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1314 /// is rebuilt to reflect the new capacity/latency weights.
1315 ///
1316 /// # Note
1317 ///
1318 /// `update_peer_metadata` does not currently re-run
1319 /// `QuorumPolicy::validate(electable_count)` after the metadata
1320 /// change. An LP-optimal `Expression` quorum that was safe before
1321 /// the update may no longer satisfy the intersection property
1322 /// afterwards. Until automatic revalidation lands, deployments
1323 /// using `QuorumPolicy::Expression` should call
1324 /// `quorum_policy().validate(get_rep_group().electable_count())`
1325 /// on the returned `RepGroup` after every metadata change and
1326 /// fail the operator-facing operation if validation reports
1327 /// unsafety.
1328 pub fn update_peer_metadata(
1329 &self,
1330 name: &str,
1331 node: crate::rep_node::RepNode,
1332 ) -> Result<()> {
1333 self.group_service.update_node_metadata(
1334 name,
1335 node.read_capacity_pct,
1336 node.write_capacity_pct,
1337 node.latency_hint_ms,
1338 )?;
1339 log::info!(
1340 "Node '{}': updated metadata for peer '{}' \
1341 (read_cap={}, write_cap={}, latency={}ms)",
1342 self.config.node_name,
1343 name,
1344 node.read_capacity_pct,
1345 node.write_capacity_pct,
1346 node.latency_hint_ms,
1347 );
1348 Ok(())
1349 }
1350
1351 /// Returns a snapshot of the current replication group as a `RepGroup`.
1352 ///
1353 /// The snapshot reflects the state at the time of the call; subsequent
1354 /// `add_peer` / `remove_peer` calls are not reflected in it.
1355 pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1356 use crate::rep_group::RepGroup;
1357
1358 let mut group = RepGroup::new(
1359 self.config.group_name.clone(),
1360 self.group_service.get_group_id(),
1361 );
1362 for info in self.group_service.get_all_nodes() {
1363 let mut node = crate::rep_node::RepNode::new(
1364 info.name.clone(),
1365 info.node_type,
1366 info.host.clone(),
1367 info.port,
1368 info.node_id,
1369 );
1370 node.read_capacity_pct = info.read_capacity_pct;
1371 node.write_capacity_pct = info.write_capacity_pct;
1372 node.latency_hint_ms = info.latency_hint_ms;
1373 group.add_node(node);
1374 }
1375 group
1376 }
1377
1378 /// Get the replication configuration.
1379 ///
1380 ///
1381 ///
1382 /// Returns the replication configuration that has been used to create this
1383 /// environment.
1384 pub fn get_config(&self) -> &RepConfig {
1385 &self.config
1386 }
1387
1388 /// Get the current VLSN range on this node.
1389 ///
1390 /// Returns the range of VLSNs currently available on this node.
1391 pub fn get_vlsn_range(&self) -> VlsnRange {
1392 self.vlsn_index.get_range()
1393 }
1394
1395 /// Get the latest VLSN.
1396 ///
1397 /// Returns the most recent VLSN registered on this node.
1398 pub fn get_current_vlsn(&self) -> u64 {
1399 self.vlsn_index.get_latest_vlsn()
1400 }
1401
1402 /// The replica-side replication stream state (master high-water, applied
1403 /// VLSN, lag). Used by the consistency read-gate to learn the master's
1404 /// latest known commit VLSN (JE `ConsistencyTracker.masterTxnEndVLSN`,
1405 /// updated by heartbeats).
1406 pub fn replica_stream(&self) -> &ReplicaStream {
1407 &self.replica_stream
1408 }
1409
1410 /// REP-10 (B): mint a [`CommitToken`] for the most recent commit on this
1411 /// master.
1412 ///
1413 /// Port of `MasterTxn.getCommitToken`: returns
1414 /// `new CommitToken(envUUID, commitVLSN.getSequence())`. A client that
1415 /// just performed a write on the master calls this to obtain the token it
1416 /// will hand to a subsequent replica read
1417 /// (`Transaction.getCommitToken`). Returns `None` on a non-master or when
1418 /// no commit VLSN exists yet (JE returns `null` when `commitVLSN.isNull`).
1419 ///
1420 /// The token's VLSN is the master's latest assigned VLSN — the same
1421 /// `wal_vlsn_counter` high-water the ack gate keys on (the commit was
1422 /// logged immediately before this call).
1423 pub fn commit_token(&self) -> Option<crate::CommitToken> {
1424 if !self.is_master() {
1425 return None;
1426 }
1427 let vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
1428 crate::CommitToken::new(self.config.group_name.clone(), vlsn)
1429 }
1430
1431 /// REP-10 (C): the read-gate. Enforce a replica read-consistency policy
1432 /// before a read transaction proceeds.
1433 ///
1434 /// Port of `ReplicaConsistencyPolicy.ensureConsistency` as invoked from a
1435 /// replica `beginTransaction` (`RepImpl.checkConsistency` /
1436 /// `Replica.getConsistencyTracker().awaitVLSN`). Called by the replica
1437 /// env's transaction-begin / read path.
1438 ///
1439 /// - `policy_override`: a per-transaction policy (JE
1440 /// `TransactionConfig.setConsistencyPolicy`). When `None`, the node's
1441 /// configured default is used (`ReplicationConfig.setConsistencyPolicy`
1442 /// — [`RepConfig::consistency_policy`]).
1443 ///
1444 /// On a master, or when the effective policy is
1445 /// [`ConsistencyPolicy::NoConsistency`], this returns immediately so
1446 /// existing behaviour is unchanged unless a policy is set. On a replica
1447 /// with a non-`NoConsistency` policy it BLOCKS until the replica has
1448 /// replayed far enough or the policy timeout expires (a clean
1449 /// [`RepError`], never a hang).
1450 pub fn begin_read_consistency(
1451 &self,
1452 policy_override: Option<&crate::ConsistencyPolicy>,
1453 ) -> Result<()> {
1454 // Resolve the effective policy: per-txn override else node default.
1455 let default_policy = self.config.consistency_policy.clone();
1456 let policy = policy_override.unwrap_or(&default_policy);
1457
1458 // NoConsistency never blocks (the master path also lands here).
1459 if matches!(policy, crate::ConsistencyPolicy::NoConsistency) {
1460 return Ok(());
1461 }
1462
1463 // A non-No policy only makes sense on a replica with a live replay
1464 // (its last_applied_vlsn is the wait predicate). Without a tracker
1465 // there is nothing to wait on — treat as immediately consistent
1466 // rather than block forever (e.g. on the master, which is by
1467 // definition fully current).
1468 let tracker = self.consistency_tracker.lock().unwrap().clone();
1469 let Some(tracker) = tracker else {
1470 return Ok(());
1471 };
1472
1473 // Surface the master's latest known VLSN for the time policy
1474 // (heartbeat / feeder high-water). JE ConsistencyTracker tracks this
1475 // via trackHeartbeat; here we read the replica_stream high-water.
1476 let master_vlsn = self.replica_stream.get_master_vlsn();
1477 if master_vlsn > 0 {
1478 tracker.set_master_vlsn(master_vlsn);
1479 }
1480
1481 tracker.await_consistency(policy)
1482 }
1483
1484 /// REP-10 (C) test seam: install a [`ConsistencyTracker`] over an existing
1485 /// `last_applied_vlsn` handle, exactly as `become_replica` does when it
1486 /// starts the live replay thread.
1487 ///
1488 /// Lets a test drive a real [`noxu_dbi::ReplicaReplay`] and exercise
1489 /// [`Self::begin_read_consistency`] end-to-end without standing up TCP
1490 /// feeder/receiver threads. Not part of the production API.
1491 #[cfg(any(test, feature = "test-harness"))]
1492 pub fn install_consistency_tracker_for_test(
1493 &self,
1494 last_applied_vlsn: std::sync::Arc<std::sync::atomic::AtomicU64>,
1495 ) -> crate::ConsistencyTracker {
1496 let tracker = crate::ConsistencyTracker::new(last_applied_vlsn);
1497 *self.consistency_tracker.lock().unwrap() = Some(tracker.clone());
1498 tracker
1499 }
1500
1501 /// REP-1 STEP 5 (D): run a live syncup against `feeder` and, if this
1502 /// replica's tail diverged, ROLL IT BACK to the common matchpoint instead
1503 /// of falling back to a network restore.
1504 ///
1505 /// Port of the replica's side of JE `ReplicaFeederSyncup.execute`:
1506 /// `findMatchpoint` → `verifyRollback` → `replay.rollback` →
1507 /// `vlsnIndex.truncateFromTail` → resume streaming from `matchpoint + 1`.
1508 ///
1509 /// `feeder` is the master's [`crate::stream::syncup::SyncupView`] (built
1510 /// from its VLSN index, or exchanged over the syncup wire protocol in
1511 /// [`crate::stream::syncup_protocol`]). The decision uses the same pure
1512 /// core the protocol drives: `find_matchpoint` + `verify_rollback`.
1513 ///
1514 /// Returns:
1515 /// - [`SyncupAction::RolledBack`] — the divergent tail was truncated to
1516 /// the matchpoint; resume streaming from `start_vlsn`. The non-diverged
1517 /// case (matchpoint == last VLSN) returns `RolledBack` with an empty
1518 /// tail and is a no-op rollback.
1519 /// - [`SyncupAction::NeedsRestore`] — `verify_rollback` selected
1520 /// NetworkRestore (no common matchpoint) or HardRecovery (the rollback
1521 /// would cross a committed/aborted txn); the caller must network-restore
1522 /// per JE.
1523 ///
1524 /// The non-diverged fast path (the replica's range is a prefix of the
1525 /// feeder's) is still served by the range-check `negotiate_syncup`
1526 /// (`SyncupResult::CanServe`) in the streaming path; this method is the
1527 /// DIVERGED case.
1528 pub fn syncup_with_feeder(
1529 &self,
1530 feeder: &dyn crate::stream::syncup::SyncupView,
1531 ) -> Result<SyncupAction> {
1532 // Build the replica's SyncupView. When a real LogManager is wired,
1533 // re-read the log (SyncupLogView) so the per-VLSN fingerprint is the
1534 // actual record checksum (JE ReplicaSyncupReader). Otherwise (the
1535 // VLSN-index-only harness model) fall back to the index view, whose
1536 // fingerprint is the LSN.
1537 let log_view: Option<crate::stream::syncup_reader::SyncupLogView> =
1538 self.env_impl.lock().unwrap().clone().and_then(|env| {
1539 if let Some(lm) = env.get_log_manager() {
1540 // Flush so all VLSN-tagged entries are on disk before the
1541 // backward re-read (JE flushNoSync in initScan).
1542 let _ = lm.flush_sync();
1543 }
1544 crate::stream::syncup_reader::SyncupLogView::scan(
1545 env.get_env_home(),
1546 )
1547 });
1548 let index_view = VlsnIndexView::from_index(&self.vlsn_index);
1549 let replica_view: &dyn crate::stream::syncup::SyncupView =
1550 match &log_view {
1551 Some(v) => v,
1552 None => &index_view,
1553 };
1554
1555 let range = self.vlsn_index.get_range();
1556 let last_sync = range.get_last_sync();
1557 let last_txn_end = range.get_last_txn_end();
1558 let to_vlsn = |v: u64| {
1559 if v == 0 {
1560 noxu_util::NULL_VLSN
1561 } else {
1562 noxu_util::Vlsn::new(v as i64)
1563 }
1564 };
1565
1566 // Step 1: find the matchpoint (JE findMatchpoint).
1567 let matchpoint = find_matchpoint(replica_view, feeder);
1568
1569 // numPassedCommits: count of txn ends strictly above the matchpoint.
1570 // When we re-read the log, count them exactly; otherwise rely on the
1571 // numeric `lastTxnEnd <= matchpoint` test in verify_rollback (which
1572 // matches JE when sync points == txn ends).
1573 let num_passed_commits = match (&log_view, &matchpoint) {
1574 (Some(v), Matchpoint::Found { vlsn, .. }) => {
1575 v.num_passed_commits(*vlsn)
1576 }
1577 _ => 0,
1578 };
1579 let decision = verify_rollback(
1580 &matchpoint,
1581 to_vlsn(last_txn_end),
1582 to_vlsn(last_sync),
1583 num_passed_commits,
1584 );
1585
1586 match decision {
1587 RollbackDecision::RollbackToMatchpoint {
1588 matchpoint_vlsn,
1589 start_vlsn,
1590 } => {
1591 let matchpoint_lsn = match &matchpoint {
1592 Matchpoint::Found { lsn, .. } => *lsn,
1593 Matchpoint::None => 0,
1594 };
1595 // Collect the rolled-back LSNs (VLSNs strictly above the
1596 // matchpoint). When the real log was re-read, use its EXACT
1597 // per-VLSN LSNs so make-invisible flips the right header bytes
1598 // (the sparse VLSN index only stores boundary/last LSNs).
1599 let mp = matchpoint_vlsn.sequence().max(0) as u64;
1600 let rollback_lsns: Vec<noxu_util::Lsn> = match &log_view {
1601 Some(v) => v
1602 .entries()
1603 .filter(|(vlsn, _)| (vlsn.sequence() as u64) > mp)
1604 .map(|(_, e)| noxu_util::Lsn::from_u64(e.lsn))
1605 .collect(),
1606 None => self
1607 .vlsn_index
1608 .snapshot_entries()
1609 .into_iter()
1610 .filter(|(vlsn, _, _)| *vlsn > mp)
1611 .map(|(_, file, offset)| {
1612 noxu_util::Lsn::new(file, offset)
1613 })
1614 .collect(),
1615 };
1616 self.execute_rollback(mp, matchpoint_lsn, &rollback_lsns)?;
1617 Ok(SyncupAction::RolledBack {
1618 matchpoint_vlsn: mp,
1619 start_vlsn: start_vlsn.sequence().max(0) as u64,
1620 })
1621 }
1622 RollbackDecision::HardRecovery { .. }
1623 | RollbackDecision::NetworkRestore => {
1624 Ok(SyncupAction::NeedsRestore)
1625 }
1626 }
1627 }
1628
1629 /// Execute the durable + in-memory rollback to `matchpoint_vlsn`
1630 /// (LSN `matchpoint_lsn`). Port of JE `Replay.rollback` +
1631 /// `vlsnIndex.truncateFromTail`.
1632 ///
1633 /// Durable steps (RollbackStart/End + make-invisible + fsync) go through
1634 /// [`noxu_recovery::rollback`] when a `LogManager` is wired; the VLSN index
1635 /// is always truncated to the matchpoint so the reported range matches the
1636 /// rolled-back state and streaming resumes from `matchpoint + 1`.
1637 fn execute_rollback(
1638 &self,
1639 matchpoint_vlsn: u64,
1640 matchpoint_lsn: u64,
1641 rollback_lsns: &[noxu_util::Lsn],
1642 ) -> Result<()> {
1643 // Durable rollback (RollbackStart … make-invisible … RollbackEnd) when
1644 // a live LogManager is available. The harness-level env (VLSN-index
1645 // only, no LogManager) skips the on-disk steps; the index truncation
1646 // below is what makes the replica converge in that model.
1647 if let Some(env) = self.env_impl.lock().unwrap().clone()
1648 && let Some(log_mgr) = env.get_log_manager()
1649 && matchpoint_lsn != 0
1650 {
1651 let mp_lsn = noxu_util::Lsn::from_u64(matchpoint_lsn);
1652 // active_txn_ids: the harness/VLSN-index model has no live txn
1653 // table here; the durable RollbackStart records an empty set, and
1654 // the per-txn gating (REP-1 STEP 2) applies during recovery when
1655 // the analysis pass rebuilds the active set. A future pass can
1656 // thread the live ReplayTxn ids through (JE
1657 // localActiveTxns.keySet()).
1658 noxu_recovery::rollback(
1659 &log_mgr,
1660 noxu_util::Vlsn::new(matchpoint_vlsn as i64),
1661 mp_lsn,
1662 Vec::new(),
1663 rollback_lsns,
1664 )
1665 .map_err(|e| {
1666 RepError::DatabaseError(format!(
1667 "live rollback to matchpoint failed: {e}"
1668 ))
1669 })?;
1670 }
1671
1672 // JE vlsnIndex.truncateFromTail(startVLSN, matchpointLSN): drop the
1673 // divergent VLSN tail so the reported range matches the recovered
1674 // state and streaming resumes from matchpoint + 1.
1675 self.vlsn_index.truncate_after(matchpoint_vlsn);
1676
1677 log::info!(
1678 "Node '{}': live syncup rolled back to matchpoint vlsn={} \
1679 (lsn={:#x}); {} tail entries truncated",
1680 self.config.node_name,
1681 matchpoint_vlsn,
1682 matchpoint_lsn,
1683 rollback_lsns.len(),
1684 );
1685 Ok(())
1686 }
1687
1688 /// Test-only: clone the env's SHARED VLSN index `Arc`.
1689 ///
1690 /// REP-6: the replica receive loop (`become_replica` ->
1691 /// `EnvironmentLogWriter`) must feed THIS index — the one
1692 /// `get_vlsn_range`, `flush_to_disk`, and election ranking read — not a
1693 /// throwaway. Tests use this to build a writer the same way
1694 /// `become_replica` does and assert the shared index advances.
1695 #[cfg(feature = "test-harness")]
1696 pub fn vlsn_index_arc(&self) -> Arc<crate::vlsn::vlsn_index::VlsnIndex> {
1697 Arc::clone(&self.vlsn_index)
1698 }
1699
1700 /// Return the list of replica names that currently have a `Feeder`
1701 /// tracker on this (master) node.
1702 ///
1703 /// Used by tests and operator tooling. The returned list reflects
1704 /// the master's view at the time of the call; subsequent
1705 /// `add_peer`/`remove_peer` calls may change it.
1706 pub fn feeder_replica_names(&self) -> Vec<String> {
1707 self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1708 }
1709
1710 /// Number of downstream connections this node has served via the JE
1711 /// `Feeder`/`MasterFeederSource` mechanism (`FeederRunner +
1712 /// EnvironmentLogScanner` reading this node's OWN WAL).
1713 ///
1714 /// A non-zero value PROVES this node fed a downstream replica by the
1715 /// SAME mechanism the master uses — a cascading replica and the master
1716 /// run the identical `PeerFeederService` → `FeederRunner` →
1717 /// `EnvironmentLogScanner` path (JE `FeederManager` → `Feeder` →
1718 /// `MasterFeederSource`). Used by the chained-replication test to assert
1719 /// the cascade does NOT use the in-memory pull fallback.
1720 pub fn wal_feeds_served(&self) -> u64 {
1721 self.wal_feeds_served.load(std::sync::atomic::Ordering::SeqCst)
1722 }
1723
1724 // -----------------------------------------------------------------------
1725 // C-C2 — active push feeder API
1726 // -----------------------------------------------------------------------
1727
1728 /// Register a channel for pushing log entries to a specific replica.
1729 ///
1730 /// When [`Self::become_master`] is called — or if the node is **already
1731 /// master** — a [`FeederRunner`] background thread is immediately spawned
1732 /// for this channel. The thread reads from a dedicated in-memory queue
1733 /// that is fed by [`Self::replicate_entry`] / [`Self::apply_entry`], and
1734 /// sends framed log entries to the replica over `channel`. Acks sent
1735 /// back by the replica are visible via
1736 /// [`Self::active_feeder_runner_acked_vlsn`].
1737 ///
1738 /// # Production vs. test use
1739 ///
1740 /// *Production*: pass a [`crate::net::TcpChannel`] connected to the
1741 /// replica's inbound feeder service.
1742 /// *Tests*: pass one half of a [`crate::net::LocalChannelPair`].
1743 ///
1744 /// # Note on push vs. pull
1745 ///
1746 /// Registering a channel activates the **push** path: the master
1747 /// initiates and owns the feeder connection. The existing **pull** path
1748 /// (`PeerFeederService` / `catch_up_from_peer`) continues to operate in
1749 /// parallel for replicas that connect proactively. Do not register a
1750 /// channel for a replica that already connects via the pull path, or
1751 /// entries may be delivered twice.
1752 ///
1753 /// If `become_master` was called *before* registering the channel, call
1754 /// this method afterward; it will spawn the FeederRunner immediately.
1755 pub fn register_feeder_channel(
1756 &self,
1757 replica_name: String,
1758 channel: Arc<dyn crate::net::Channel>,
1759 ) {
1760 {
1761 let mut ch = self.feeder_channels.lock().unwrap();
1762 ch.insert(replica_name.clone(), Arc::clone(&channel));
1763 }
1764 if self.is_master() {
1765 self.spawn_feeder_runner(replica_name, channel);
1766 }
1767 }
1768
1769 /// Return the last VLSN acknowledged by the FeederRunner for `replica_name`.
1770 ///
1771 /// Returns `0` if no FeederRunner is currently active for that replica
1772 /// (either `become_master` was not called yet, or no channel was
1773 /// registered). Use this to poll catch-up progress before shutdown.
1774 pub fn active_feeder_runner_acked_vlsn(&self, replica_name: &str) -> u64 {
1775 self.active_feeder_runners
1776 .lock()
1777 .unwrap()
1778 .get(replica_name)
1779 .map(|r| r.known_replica_vlsn())
1780 .unwrap_or(0)
1781 }
1782
1783 /// Spawn a FeederRunner thread for `replica_name` using `channel`.
1784 ///
1785 /// Creates a dedicated `PeerLogScanner` queue for the replica, registers
1786 /// it in `feeder_queues` so that future `replicate_entry` / `apply_entry`
1787 /// calls fan out into it, spawns the `FeederRunner::run` loop, and
1788 /// records the `Arc<FeederRunner>` in `active_feeder_runners`.
1789 ///
1790 /// Idempotent: if a FeederRunner is already active for `replica_name`
1791 /// (from a prior `become_master` call), it is replaced — the old channel
1792 /// should have been closed already via `close()`.
1793 ///
1794 /// **WAL-scanner auto-feed path (C-C2b)**: when a live `EnvironmentImpl`
1795 /// has been wired via `with_environment`, the FeederRunner thread uses an
1796 /// `EnvironmentLogScanner` as its source. Every `log_txn_commit` on the
1797 /// master writes a VLSN-tagged WAL entry (22-byte header); the scanner
1798 /// finds these entries and streams them to the replica automatically,
1799 /// without any `replicate_entry` call from the application.
1800 ///
1801 /// **Fallback path**: when no `EnvironmentImpl` is wired the runner reads
1802 /// from the in-memory `PeerLogScanner` queue populated by
1803 /// `replicate_entry` / `apply_entry` — the previous manual behaviour.
1804 fn spawn_feeder_runner(
1805 &self,
1806 replica_name: String,
1807 channel: Arc<dyn crate::net::Channel>,
1808 ) {
1809 // Dedicated entry queue: entries flowing from this master reach the
1810 // FeederRunner without competing with PeerFeederService.
1811 let queue = Arc::new(PeerLogScanner::new());
1812 {
1813 self.feeder_queues
1814 .write()
1815 .unwrap()
1816 .insert(replica_name.clone(), Arc::clone(&queue));
1817 }
1818
1819 // REP-9 Part 1: wire an ack sink so the FeederRunner forwards every
1820 // inbound replica ack to `env.record_ack(vlsn, replica_name)`, which
1821 // reaches BOTH the AckTracker (commit-blocking quorum) and the
1822 // matching `Feeder::acked_vlsn` (DTVLSN ranking). Without this the
1823 // ack reached only the runner's private `known_replica_vlsn`. The
1824 // sink holds a `Weak<Self>` so it never extends the env's lifetime;
1825 // if `self_weak` was never initialised we fall back to the plain
1826 // (sink-less) runner — `record_ack` is still reachable from tests.
1827 let runner = match self.self_weak.get().and_then(Weak::upgrade) {
1828 Some(env_arc) => {
1829 let weak = Arc::downgrade(&env_arc);
1830 let sink: crate::stream::feeder::AckSink =
1831 Arc::new(move |name: &str, vlsn: u64| {
1832 if let Some(env) = weak.upgrade() {
1833 env.record_ack(vlsn, name);
1834 }
1835 });
1836 Arc::new(FeederRunner::new_with_ack_sink(
1837 Arc::clone(&channel),
1838 1,
1839 replica_name.clone(),
1840 sink,
1841 ))
1842 }
1843 None => Arc::new(FeederRunner::new(Arc::clone(&channel), 1)),
1844 };
1845 let runner_clone = Arc::clone(&runner);
1846 let replica_clone = replica_name.clone();
1847
1848 // C-C2b: prefer EnvironmentLogScanner (WAL auto-feed) when env is
1849 // wired; fall back to in-memory queue (manual replicate_entry path)
1850 // otherwise.
1851 let env_opt = self.env_impl.lock().unwrap().clone();
1852
1853 let handle = std::thread::Builder::new()
1854 .name(format!("noxu-feeder-{}", replica_name))
1855 .spawn(move || {
1856 if let Some(env) = env_opt {
1857 if let Some(mut scanner) =
1858 EnvironmentLogScanner::new(&env, None)
1859 {
1860 log::info!(
1861 "FeederRunner for replica '{}': using \
1862 EnvironmentLogScanner (WAL auto-feed)",
1863 replica_clone,
1864 );
1865 let _ = runner_clone.run(&mut scanner);
1866 } else {
1867 log::warn!(
1868 "FeederRunner for replica '{}': \
1869 EnvironmentLogScanner unavailable, \
1870 falling back to in-memory queue",
1871 replica_clone,
1872 );
1873 let mut source = PeerScannerAdapter::new(queue, 0);
1874 let _ = runner_clone.run(&mut source);
1875 }
1876 } else {
1877 let mut source = PeerScannerAdapter::new(queue, 0);
1878 let _ = runner_clone.run(&mut source);
1879 }
1880 log::debug!(
1881 "FeederRunner for replica '{}' exited cleanly",
1882 replica_clone
1883 );
1884 })
1885 .expect("failed to spawn FeederRunner thread");
1886
1887 {
1888 let mut runners = self.active_feeder_runners.lock().unwrap();
1889 runners.insert(replica_name.clone(), Arc::clone(&runner));
1890 }
1891 self.io_threads.lock().unwrap().push(handle);
1892
1893 log::info!(
1894 "Node '{}' (master): FeederRunner thread spawned for replica '{}'",
1895 self.config.node_name.as_str(),
1896 replica_name,
1897 );
1898 }
1899
1900 // -----------------------------------------------------------------------
1901
1902 /// Bootstrap this node's environment by network-restoring all `.ndb`
1903 /// files from `peer_name` via the dispatcher's RESTORE service.
1904 ///
1905 /// Closes findings F2 / F4 of the 2026 review.
1906 ///
1907 /// The standalone `NetworkRestore::execute()` opens raw TCP and
1908 /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1909 /// Production replicated environments host the RESTORE handler on the
1910 /// dispatcher, so this method routes through `execute_via_dispatcher`.
1911 ///
1912 /// `peer_name` must be a known peer in `GroupService`; on success the
1913 /// peer's `.ndb` files are written into `config.env_home`. Returns
1914 /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1915 /// fails for any reason.
1916 pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1917 let env_home = self.config.env_home.clone().ok_or_else(|| {
1918 RepError::ConfigError(
1919 "bootstrap_via_dispatcher requires env_home in RepConfig"
1920 .into(),
1921 )
1922 })?;
1923 let peer_info = self
1924 .group_service
1925 .get_all_nodes()
1926 .into_iter()
1927 .find(|n| n.name == peer_name)
1928 .ok_or_else(|| {
1929 RepError::ConfigError(format!(
1930 "peer '{}' not registered in group '{}'",
1931 peer_name, self.config.group_name,
1932 ))
1933 })?;
1934
1935 let cfg = NetworkRestoreConfig {
1936 source_node: peer_info.name.clone(),
1937 source_host: peer_info.host.clone(),
1938 source_port: peer_info.port,
1939 retain_log_files: true,
1940 };
1941 let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1942 restore.execute_via_dispatcher()?;
1943 log::info!(
1944 "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1945 self.config.node_name,
1946 peer_info.name,
1947 peer_info.host,
1948 peer_info.port,
1949 );
1950 Ok(())
1951 }
1952
1953 /// Get replication statistics.
1954 ///
1955 ///
1956 ///
1957 /// Returns statistics associated with this environment.
1958 pub fn get_stats(&self) -> &RepStats {
1959 &self.stats
1960 }
1961
1962 /// Get the ack tracker.
1963 pub fn get_ack_tracker(&self) -> &AckTracker {
1964 &self.ack_tracker
1965 }
1966
1967 /// Ensure the node state machine is in Unknown state, transitioning
1968 /// from Detached if necessary. This is needed because the state machine
1969 /// only allows Detached -> Unknown -> Master/Replica.
1970 pub fn ensure_unknown_state(&self) -> Result<()> {
1971 let current = self.node_state.get_state();
1972 match current {
1973 NodeState::Unknown => Ok(()),
1974 NodeState::Detached => {
1975 self.node_state.transition_to(NodeState::Unknown)?;
1976 Ok(())
1977 }
1978 // Master and Replica must transition through Unknown before
1979 // joining a new group or reconnecting.
1980 NodeState::Master | NodeState::Replica => {
1981 self.node_state.transition_to(NodeState::Unknown)?;
1982 Ok(())
1983 }
1984 NodeState::Shutdown => {
1985 Err(RepError::StateError("Node is shut down".to_string()))
1986 }
1987 }
1988 }
1989
1990 /// Transition to master state.
1991 ///
1992 /// Transitions this node to Master state for the given election term.
1993 /// As master, the node can accept write operations and feed log entries
1994 /// to replicas.
1995 ///
1996 /// **Active push-feeder** (C-C2): if feeder channels have been registered
1997 /// via [`Self::register_feeder_channel`] before this call, a
1998 /// [`FeederRunner`] background thread is spawned per channel.
1999 ///
2000 /// **WAL-scanner auto-feed path (C-C2b, v3.3.0)**: when
2001 /// [`Self::with_environment`] has been called before `become_master`,
2002 /// each `FeederRunner` thread uses an [`EnvironmentLogScanner`] as its
2003 /// source. Every `log_txn_commit` on the master writes a VLSN-tagged
2004 /// 22-byte WAL entry (via `LogManager::log_with_vlsn`); the scanner
2005 /// discovers these entries and streams them to replicas automatically,
2006 /// without any [`Self::replicate_entry`] call from the application.
2007 ///
2008 /// **Fallback path**: when no `EnvironmentImpl` is wired, the runner
2009 /// reads from the in-memory queue populated by [`Self::replicate_entry`] /
2010 /// [`Self::apply_entry`].
2011 ///
2012 /// If no feeder channels are registered, this call registers per-replica
2013 /// `Feeder` tracker structs for `AckTracker` bookkeeping only. In that
2014 /// case replicas must connect proactively to the `PEER_FEEDER` pull
2015 /// service to receive entries.
2016 pub fn become_master(&self, term: u64) -> Result<()> {
2017 if self.is_shutdown() {
2018 return Err(RepError::StateError(
2019 "Cannot become master: environment is closed".to_string(),
2020 ));
2021 }
2022
2023 // JE invariant: only `Electable` nodes can become master. `Secondary`,
2024 // `Monitor`, and `Arbiter` are not electable and must be rejected at
2025 // the API layer (mirrors JE `ExceptionTest`). See
2026 // `NodeType::can_be_master`.
2027 if !self.config.node_type.can_be_master() {
2028 return Err(RepError::InvalidStateTransition(format!(
2029 "node '{}' has type {} which is not electable as master",
2030 self.config.node_name.as_str(),
2031 self.config.node_type,
2032 )));
2033 }
2034
2035 // Ensure we can reach Master state (may need Detached -> Unknown first)
2036 self.ensure_unknown_state()?;
2037
2038 let old_state = self.node_state.get_state();
2039 self.node_state.transition_to(NodeState::Master)?;
2040 self.master_tracker.set_master(self.config.node_name.as_str(), term);
2041
2042 // --- F9: spawn Feeder trackers for each known replica -------------
2043 //
2044 // Closes finding F9 of the 2026 review.
2045 // The architecture is pull-based: replicas pull from the master's
2046 // `PEER_FEEDER` service via `catch_up_from_peer`. However, the
2047 // master must:
2048 // 1. Track each replica via a `Feeder` so AckTracker bookkeeping
2049 // can attribute replica acks to the right node.
2050 // 2. Push its own writes into `peer_scanner` so replicas pulling
2051 // from `PEER_FEEDER` actually receive entries (`replicate_entry`).
2052 //
2053 // Here we ensure step 1: every known electable peer in the group
2054 // gets a `Feeder` entry.
2055 {
2056 let mut feeders = self.feeders.write();
2057 // Drop any stale feeders left over from a prior role. A
2058 // `Feeder` is just an in-memory tracker; recreating it is
2059 // cheap and avoids state inversion bugs across role changes.
2060 feeders.clear();
2061 for peer in self.group_service.get_all_nodes() {
2062 if peer.name == self.config.node_name {
2063 continue;
2064 }
2065 if peer.node_type != crate::node_type::NodeType::Electable
2066 && peer.node_type != crate::node_type::NodeType::Secondary
2067 {
2068 // Arbiters do not receive log entries.
2069 continue;
2070 }
2071 feeders.push(Feeder::new(peer.name.clone()));
2072 log::debug!(
2073 "Node '{}' (master, term={}): registered Feeder for \
2074 replica '{}'",
2075 self.config.node_name.as_str(),
2076 term,
2077 peer.name,
2078 );
2079 }
2080 }
2081
2082 // For observability, log the count.
2083 log::info!(
2084 "Node '{}' became master for term {} \
2085 (feeder trackers: {} known replicas)",
2086 self.config.node_name.as_str(),
2087 term,
2088 self.feeders.read().len(),
2089 );
2090
2091 // C-C2: spawn FeederRunner threads for pre-registered channels.
2092 //
2093 // When `register_feeder_channel` was called before `become_master`,
2094 // the channels are already in `feeder_channels`. Drain them and
2095 // spawn a FeederRunner per replica. The FeederRunner reads from a
2096 // dedicated `PeerLogScanner` queue (populated by `replicate_entry`
2097 // fan-out) and pushes framed log entries to the replica over the
2098 // registered channel. Acks from the replica are tracked in the
2099 // FeederRunner and visible via `active_feeder_runner_acked_vlsn`.
2100 {
2101 let channels: Vec<(String, Arc<dyn crate::net::Channel>)> = self
2102 .feeder_channels
2103 .lock()
2104 .unwrap()
2105 .iter()
2106 .map(|(k, v)| (k.clone(), Arc::clone(v)))
2107 .collect();
2108 for (replica_name, channel) in channels {
2109 self.spawn_feeder_runner(replica_name, channel);
2110 }
2111 }
2112
2113 // --- WAL-backed PEER_FEEDER for pull-path replicas -------------------
2114 //
2115 // The master's writes go to its WAL (VLSN-tagged 22-byte headers) and
2116 // its VLSN index, but NOT necessarily to the in-memory `peer_scanner`
2117 // (e.g. `register_vlsn_typed` only updates the index). A replica that
2118 // pulls via the `PEER_FEEDER` service therefore finds an empty
2119 // in-memory scanner and gets `NeedsRestore`.
2120 //
2121 // Re-register PEER_FEEDER with a WAL-backed source so a pulling
2122 // replica receives the VLSN-tagged stream straight from the master's
2123 // OWN WAL via the same `EnvironmentLogScanner` + `FeederRunner` used
2124 // throughout. Faithful to JE `MasterFeederSource(repImpl, vlsnIndex,
2125 // startVLSN)`, which reads the VLSNIndex + log regardless of node
2126 // role; `FeederManager` runs feeders on whatever node holds the data.
2127 // (The same registration runs, gated on `cascade_feeding`, in
2128 // `become_replica` so a mid-tier replica can cascade downstream.)
2129 if let Some(env) = self.env_impl.lock().unwrap().clone()
2130 && let Some(ref dispatcher) = self.tcp_dispatcher
2131 {
2132 let wal_source = crate::stream::peer_feeder::WalFeederSource::new(
2133 Arc::clone(&env),
2134 Arc::clone(&self.vlsn_index),
2135 );
2136 let svc = PeerFeederService::with_wal_source_counted(
2137 Arc::clone(&self.peer_scanner),
2138 wal_source,
2139 Arc::clone(&self.wal_feeds_served),
2140 );
2141 dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(svc));
2142 log::debug!(
2143 "Node '{}' (master): PEER_FEEDER now serves replicas from \
2144 its own WAL",
2145 self.config.node_name.as_str(),
2146 );
2147 }
2148
2149 // -------------------------------------------------------------------
2150
2151 // Notify listeners
2152 self.notify_listeners(old_state, NodeState::Master);
2153
2154 Ok(())
2155 }
2156
2157 /// Transition to replica state with the given master.
2158 ///
2159 /// Transitions this node to Replica state. The node will receive log
2160 /// entries from the specified master.
2161 ///
2162 /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
2163 /// the method prepares an `EnvironmentLogWriter` so that replicated
2164 /// entries can be written to the local log. The actual network connection
2165 /// is established by the `TcpServiceDispatcher`; this method logs intent.
2166 ///
2167 /// In HA.
2168 pub fn become_replica(&self, master_name: &str) -> Result<()> {
2169 if self.is_shutdown() {
2170 return Err(RepError::StateError(
2171 "Cannot become replica: environment is closed".to_string(),
2172 ));
2173 }
2174
2175 // Ensure we can reach Replica state (may need Detached -> Unknown first)
2176 self.ensure_unknown_state()?;
2177
2178 let old_state = self.node_state.get_state();
2179 self.node_state.transition_to(NodeState::Replica)?;
2180 self.master_tracker.set_master(master_name, 0);
2181 self.replica_stream.set_master(master_name);
2182 self.replica_stream.set_state(
2183 crate::stream::replica_stream::ReplicaStreamState::Connecting,
2184 );
2185
2186 // --- G19: start replica receive loop --------------------------------
2187 //
2188 // Connects to the master's PEER_FEEDER service and runs a
2189 // ReplicaReceiver loop in a background thread. The receiver writes
2190 // replicated entries via EnvironmentLogWriter.
2191 if let Some(env) = self.env_impl.lock().unwrap().clone() {
2192 if let Some(log_mgr) = env.get_log_manager() {
2193 // REP-6: feed the env's SHARED, persisted VLSN index (the one
2194 // flush_to_disk persists and get_vlsn_range / election ranking
2195 // read) into the replica receive loop — NOT a throwaway. Using
2196 // a fresh index would leave the persisted vlsn.idx, the
2197 // reported VLSN range, and the DTVLSN-ranking own_vlsn lagging
2198 // the actually-received stream, widening catch-up (or forcing
2199 // an unnecessary network restore) after a clean restart.
2200 // JE: the replica's VLSNIndex IS the environment's persisted
2201 // index (see VLSNIndex).
2202 let vlsn_index = Arc::clone(&self.vlsn_index);
2203
2204 // --- Chained replication: start a WAL-backed feeder source ---
2205 //
2206 // When `cascade_feeding` is enabled, re-register this node's
2207 // PEER_FEEDER service with a WAL-backed source so a DOWNSTREAM
2208 // replica can connect and receive the VLSN-tagged log stream
2209 // FROM THIS REPLICA's OWN WAL (the bytes it received + persisted
2210 // via EnvironmentLogWriter::log_with_vlsn). The feeder uses the
2211 // same EnvironmentLogScanner + FeederRunner the master uses.
2212 //
2213 // Faithful to JE's cascading-feeder model: the same
2214 // FeederManager/Feeder/FeederSource machinery runs on any node
2215 // that holds the data. `FeederSource` is documented as "a real
2216 // Master OR a Replica in a Replica chain that is replaying log
2217 // records it received from some other source"
2218 // (`FeederSource.java`); `MasterFeederSource(repImpl, vlsnIndex,
2219 // startVLSN)` reads the VLSNIndex + log regardless of role.
2220 //
2221 // Default OFF (master-direct) preserves current behaviour: a
2222 // replica's PEER_FEEDER stays backed by the in-memory pull
2223 // scanner unless cascade is explicitly enabled.
2224 if self.config.cascade_feeding {
2225 if let Some(ref dispatcher) = self.tcp_dispatcher {
2226 let wal_source =
2227 crate::stream::peer_feeder::WalFeederSource::new(
2228 Arc::clone(&env),
2229 Arc::clone(&self.vlsn_index),
2230 );
2231 let svc = PeerFeederService::with_wal_source_counted(
2232 Arc::clone(&self.peer_scanner),
2233 wal_source,
2234 Arc::clone(&self.wal_feeds_served),
2235 );
2236 dispatcher
2237 .register(PEER_FEEDER_SERVICE_NAME, Arc::new(svc));
2238 log::info!(
2239 "Node '{}' (replica): cascade feeding ENABLED — \
2240 PEER_FEEDER now serves downstream replicas from \
2241 its own WAL via the SAME FeederRunner + \
2242 EnvironmentLogScanner mechanism the master uses \
2243 (JE Feeder + MasterFeederSource)",
2244 self.config.node_name.as_str(),
2245 );
2246 } else {
2247 log::warn!(
2248 "Node '{}': cascade_feeding set but no TCP \
2249 dispatcher; downstream replicas cannot connect",
2250 self.config.node_name.as_str(),
2251 );
2252 }
2253 }
2254
2255 // Resolve the master's socket address from the GroupService.
2256 let master_addr_opt: Option<SocketAddr> = self
2257 .group_service
2258 .get_all_nodes()
2259 .iter()
2260 .find(|n| n.name == master_name)
2261 .and_then(|info| {
2262 format!("{}:{}", info.host, info.port)
2263 .parse::<SocketAddr>()
2264 .ok()
2265 });
2266
2267 let node_name = self.config.node_name.clone();
2268 let master = master_name.to_string();
2269 let vlsn_index_clone = Arc::clone(&vlsn_index);
2270 // Live shutdown flag (shared Arc): the receive loop polls it
2271 // so `close()` can break the blocking upstream receive and
2272 // join this thread — vital for a mid-tier replica in a chain
2273 // that is closed before its upstream feeder.
2274 let shutdown = Arc::clone(&self.io_shutdown);
2275 // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
2276 // can call `bootstrap_via_dispatcher` automatically when
2277 // the master signals `NeedsRestore`. When the env was
2278 // never registered with `init_self_weak` (raw
2279 // `Arc::new(Self::new(...))` without going through
2280 // `open()` or the test harness), the weak ref is `None`
2281 // and we fall back to operator-driven bootstrap.
2282 let self_weak: Option<Weak<Self>> =
2283 self.self_weak.get().cloned();
2284
2285 // REP-7 (B): clone the live EnvironmentImpl into the replica
2286 // thread so the writer can drive a ReplicaReplay that applies
2287 // each streamed entry to the live in-memory tree.
2288 let env_for_replay = Arc::clone(&env);
2289
2290 // REP-10 (C): build the ReplicaReplay HERE (not inside the
2291 // closure) so we can publish its REP-7 `last_applied_vlsn`
2292 // handle to a ConsistencyTracker BEFORE the thread starts
2293 // streaming. A read on this replica then waits on the same
2294 // handle the replay thread advances. Port of
2295 // RepImpl.getConsistency / Replica.getConsistencyTracker.
2296 let replay = noxu_dbi::ReplicaReplay::new(env_for_replay);
2297 let tracker = crate::ConsistencyTracker::new(
2298 replay.last_applied_vlsn_handle(),
2299 );
2300 *self.consistency_tracker.lock().unwrap() = Some(tracker);
2301
2302 let handle = std::thread::Builder::new()
2303 .name(format!("noxu-replica-{}", node_name))
2304 .spawn(move || {
2305 // REP-7 (B): wire the live replay-apply path so reads
2306 // on the replica see replicated data without a
2307 // restart. JE: the replica writes each entry to its
2308 // log, then Replay.replayEntry applies it to the tree.
2309 let mut writer = EnvironmentLogWriter::with_replay(
2310 log_mgr,
2311 vlsn_index_clone,
2312 replay,
2313 );
2314
2315 let Some(addr) = master_addr_opt else {
2316 log::warn!(
2317 "noxu-replica-{}: master '{}' address not in RepGroup; \
2318 waiting for TCP dispatcher connection",
2319 node_name, master,
2320 );
2321 return;
2322 };
2323
2324 // Catch-up loop: catch up, observe NeedsRestore,
2325 // optionally auto-bootstrap, retry once. We cap
2326 // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
2327 // (small) so a misbehaving master does not loop
2328 // forever consuming network bandwidth.
2329 const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
2330 let mut attempts: u32 = 0;
2331 loop {
2332 // Observe close before (re)connecting so a
2333 // shutdown between catch-up attempts exits
2334 // promptly.
2335 if shutdown.load(Ordering::SeqCst) {
2336 return;
2337 }
2338 log::info!(
2339 "noxu-replica-{}: connecting to master '{}' at {}",
2340 node_name, master, addr,
2341 );
2342 match crate::stream::peer_feeder::catch_up_from_peer_until(
2343 addr, 0, &mut writer, &shutdown,
2344 ) {
2345 Ok(true) => {
2346 log::info!(
2347 "noxu-replica-{}: catch-up complete from '{}'",
2348 node_name, master,
2349 );
2350 return;
2351 }
2352 Ok(false) => {
2353 // F2/F4: master signals NeedsRestore.
2354 // Wave 9-A fix 2: if a Weak<Self> was
2355 // plumbed in, upgrade it and call
2356 // `bootstrap_via_dispatcher` ourselves
2357 // so the replica auto-bootstraps and
2358 // resumes catch-up without operator
2359 // intervention.
2360 log::warn!(
2361 "noxu-replica-{}: master '{}' requires restore",
2362 node_name, master,
2363 );
2364 attempts += 1;
2365 if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
2366 log::error!(
2367 "noxu-replica-{}: exceeded \
2368 auto-bootstrap attempts ({}); giving up",
2369 node_name,
2370 MAX_AUTO_BOOTSTRAP_ATTEMPTS,
2371 );
2372 return;
2373 }
2374 let env_arc = match self_weak
2375 .as_ref()
2376 .and_then(Weak::upgrade)
2377 {
2378 Some(e) => e,
2379 None => {
2380 // No back-ref or env dropped:
2381 // fall back to operator-driven
2382 // bootstrap and exit cleanly.
2383 log::warn!(
2384 "noxu-replica-{}: no back-reference \
2385 available; operator must call \
2386 bootstrap_via_dispatcher manually",
2387 node_name,
2388 );
2389 return;
2390 }
2391 };
2392 if env_arc.is_shutdown() {
2393 return;
2394 }
2395 log::info!(
2396 "noxu-replica-{}: auto-bootstrapping via \
2397 dispatcher from '{}' (attempt {})",
2398 node_name, master, attempts,
2399 );
2400 match env_arc
2401 .bootstrap_via_dispatcher(&master)
2402 {
2403 Ok(()) => {
2404 log::info!(
2405 "noxu-replica-{}: auto-bootstrap \
2406 succeeded; resuming catch-up",
2407 node_name,
2408 );
2409 // Drop the strong ref before
2410 // re-entering catch-up so we
2411 // do not keep the env alive
2412 // longer than necessary.
2413 drop(env_arc);
2414 continue;
2415 }
2416 Err(e) => {
2417 log::error!(
2418 "noxu-replica-{}: auto-bootstrap \
2419 failed: {}",
2420 node_name, e,
2421 );
2422 return;
2423 }
2424 }
2425 }
2426 Err(e) => {
2427 if !shutdown.load(Ordering::SeqCst) {
2428 log::error!(
2429 "noxu-replica-{}: error from master '{}': {e}",
2430 node_name, master,
2431 );
2432 }
2433 return;
2434 }
2435 }
2436 }
2437 })
2438 .expect("failed to spawn noxu-replica thread");
2439
2440 self.io_threads.lock().unwrap().push(handle);
2441
2442 log::debug!(
2443 "Node '{}': replica receive thread started for master '{}'",
2444 self.config.node_name.as_str(),
2445 master_name,
2446 );
2447 } else {
2448 log::warn!(
2449 "Node '{}': no LogManager available (read-only env?); \
2450 replica I/O loop not started",
2451 self.config.node_name.as_str(),
2452 );
2453 }
2454 }
2455 // -------------------------------------------------------------------
2456
2457 // Notify listeners
2458 self.notify_listeners(old_state, NodeState::Replica);
2459
2460 log::info!(
2461 "Node '{}' became replica of master '{}'",
2462 self.config.node_name.as_str(),
2463 master_name
2464 );
2465 Ok(())
2466 }
2467
2468 /// Initiate a master transfer to the target node.
2469 ///
2470 ///
2471 ///
2472 /// Transfers the current master state from this node to one of the
2473 /// electable replicas. The replica that is actually chosen to be the new
2474 /// master is the one with which the Master Transfer can be completed most
2475 /// rapidly. The transfer operation ensures that all changes at this node
2476 /// are available at the new master upon conclusion of the operation.
2477 pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
2478 if self.is_shutdown() {
2479 return Err(RepError::StateError(
2480 "Cannot transfer master: environment is closed".to_string(),
2481 ));
2482 }
2483
2484 if !self.is_master() {
2485 return Err(RepError::InvalidState(
2486 "Master transfer can only be initiated on the master node"
2487 .to_string(),
2488 ));
2489 }
2490
2491 log::info!(
2492 "Node '{}' initiating master transfer to '{}'",
2493 self.config.node_name.as_str(),
2494 config.target_node,
2495 );
2496
2497 // Closes finding F7 of the 2026 review.
2498 //
2499 // Steps:
2500 // 1. Locate the target's address.
2501 // 2. Compute the new term (current observed term + 1).
2502 // 3. Send TRANSFER_MASTER to the target — it will become master.
2503 // 4. Send TRANSFER_MASTER (with the same term + new master name) to
2504 // every other peer so they re-target.
2505 // 5. Demote self to Replica of the target.
2506 //
2507 // The transfer is best-effort: a peer that doesn't ack is logged
2508 // and skipped. The election driver will reconcile any divergence
2509 // on the next election round.
2510
2511 let target_addr = self
2512 .group_service
2513 .get_all_nodes()
2514 .into_iter()
2515 .find(|n| n.name == config.target_node)
2516 .and_then(|n| {
2517 format!("{}:{}", n.host, n.port)
2518 .parse::<std::net::SocketAddr>()
2519 .ok()
2520 })
2521 .ok_or_else(|| {
2522 RepError::ConfigError(format!(
2523 "transfer_master: target '{}' not registered or has bad address",
2524 config.target_node
2525 ))
2526 })?;
2527
2528 let new_term = self.master_tracker.get_term().saturating_add(1);
2529
2530 // 1. Tell the target to become master at the new term.
2531 let target_ack = crate::group_admin::send_transfer_master(
2532 target_addr,
2533 &config.target_node,
2534 new_term,
2535 )
2536 .map_err(|e| {
2537 RepError::NetworkError(format!(
2538 "transfer_master: failed to signal target '{}': {}",
2539 config.target_node, e
2540 ))
2541 })?;
2542 if !target_ack {
2543 return Err(RepError::StateError(format!(
2544 "transfer_master: target '{}' rejected the transfer",
2545 config.target_node
2546 )));
2547 }
2548
2549 // 2. Inform all other peers (best-effort).
2550 for peer in self.group_service.get_all_nodes() {
2551 if peer.name == self.config.node_name
2552 || peer.name == config.target_node
2553 {
2554 continue;
2555 }
2556 if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
2557 let _ = crate::group_admin::send_transfer_master(
2558 addr,
2559 &config.target_node,
2560 new_term,
2561 );
2562 }
2563 }
2564
2565 // 3. Demote self to Replica of the new master.
2566 self.become_replica(&config.target_node)?;
2567
2568 log::info!(
2569 "Node '{}' transferred master to '{}' at term {}",
2570 self.config.node_name.as_str(),
2571 config.target_node,
2572 new_term,
2573 );
2574 Ok(())
2575 }
2576
2577 /// Register a VLSN (as master, after writing a log entry).
2578 ///
2579 /// Maps the given VLSN to the specified log file position. This is called
2580 /// by the master after it writes a replicated log entry.
2581 pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
2582 self.vlsn_index.register(vlsn, file_number, file_offset);
2583 }
2584
2585 /// Register a VLSN→LSN mapping with its `LogEntryType`, so `lastSync` /
2586 /// `lastTxnEnd` advance (JE `VLSNRange.getUpdateForNewMapping`). Used by
2587 /// the syncup driver/tests that apply VLSN-tagged entries to a real log
2588 /// and need the sync/commit boundaries to track the stream.
2589 pub fn register_vlsn_typed(
2590 &self,
2591 vlsn: u64,
2592 file_number: u32,
2593 file_offset: u32,
2594 entry_type: noxu_log::LogEntryType,
2595 ) {
2596 self.vlsn_index.register_with_type(
2597 vlsn,
2598 file_number,
2599 file_offset,
2600 entry_type,
2601 );
2602 }
2603
2604 /// Replicate a freshly committed log entry from the master.
2605 ///
2606 /// Closes finding F9 of the 2026 review.
2607 ///
2608 /// Combines `register_vlsn` with a push into the in-memory
2609 /// `peer_scanner` so that downstream replicas pulling from this
2610 /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
2611 /// stream the entry without round-tripping through the on-disk
2612 /// log. The local log is still the source of truth; the peer
2613 /// scanner is a fast-path cache that bounds itself via
2614 /// `PeerLogScanner::with_capacity` so old entries are evicted.
2615 ///
2616 /// Should be called by the master after the local commit has
2617 /// fsynced. Calling on a non-master is harmless (the peer
2618 /// scanner cache is also used by replicas) but is logged at trace
2619 /// level for diagnostics.
2620 pub fn replicate_entry(
2621 &self,
2622 vlsn: u64,
2623 file_number: u32,
2624 file_offset: u32,
2625 entry_type: u8,
2626 data: Vec<u8>,
2627 ) {
2628 // Register VLSN -> LSN, dispatching entry type so lastSync /
2629 // lastTxnEnd advance (REP-5; JE VLSNRange.getUpdateForNewMapping).
2630 // An unknown type byte falls back to extend-only registration.
2631 match noxu_log::LogEntryType::from_type_num(entry_type) {
2632 Some(et) => self.vlsn_index.register_with_type(
2633 vlsn,
2634 file_number,
2635 file_offset,
2636 et,
2637 ),
2638 None => self.vlsn_index.register(vlsn, file_number, file_offset),
2639 }
2640 // Pull path: shared peer_scanner serves replicas connecting via
2641 // PeerFeederService (catch_up_from_peer).
2642 self.peer_scanner.push(vlsn, entry_type, data.clone());
2643 // Push path (C-C2): fan out to per-replica FeederRunner queues so
2644 // that threads spawned by become_master can stream entries to each
2645 // registered replica without competing with PeerFeederService.
2646 {
2647 let queues = self.feeder_queues.read().unwrap();
2648 for queue in queues.values() {
2649 queue.push(vlsn, entry_type, data.clone());
2650 }
2651 }
2652 if !self.is_master() {
2653 log::trace!(
2654 "replicate_entry called on non-master node '{}': vlsn={}, type={}",
2655 self.config.node_name,
2656 vlsn,
2657 entry_type,
2658 );
2659 }
2660 }
2661
2662 /// Apply a replicated entry (as replica).
2663 ///
2664 /// Applies a log entry received from the master. This is called by the
2665 /// replica stream handler after receiving an entry from the feeder.
2666 ///
2667 /// `data` is the wire-encoded log-record payload. When the
2668 /// replicated environment has not been wired to a local
2669 /// `noxu_db::Environment` (i.e., before `with_environment` is
2670 /// called) the payload is forwarded into the in-memory peer
2671 /// scanner so that downstream replicas attached to the
2672 /// `PEER_FEEDER` service can re-stream it; the local log is **not**
2673 /// updated. This is documented behaviour rather than a stub — see
2674 /// the 2026 review finding #26 (medium) for the
2675 /// `with_environment`-required local-apply path.
2676 /// cleanup (rep info F35: `_data` placeholder) renames the leading
2677 /// underscore so reviewers don't read it as a TODO.
2678 pub fn apply_entry(
2679 &self,
2680 vlsn: u64,
2681 entry_type: u8,
2682 data: Vec<u8>,
2683 ) -> Result<()> {
2684 if self.is_shutdown() {
2685 return Err(RepError::StateError(
2686 "Cannot apply entry: environment is closed".to_string(),
2687 ));
2688 }
2689
2690 // Register the VLSN in the index, dispatching entry type so
2691 // lastSync/lastTxnEnd advance (REP-5; JE
2692 // VLSNRange.getUpdateForNewMapping).
2693 match noxu_log::LogEntryType::from_type_num(entry_type) {
2694 Some(et) => self.vlsn_index.register_with_type(vlsn, 0, 0, et),
2695 None => self.vlsn_index.register(vlsn, 0, 0),
2696 }
2697
2698 // Push into the peer log scanner so downstream replicas can
2699 // receive this entry via the PEER_FEEDER service.
2700 self.peer_scanner.push(vlsn, entry_type, data.clone());
2701 // C-C2 push path: fan out to per-replica FeederRunner queues.
2702 {
2703 let queues = self.feeder_queues.read().unwrap();
2704 for queue in queues.values() {
2705 queue.push(vlsn, entry_type, data.clone());
2706 }
2707 }
2708
2709 log::trace!(
2710 "Applied replicated entry: vlsn={}, type={}",
2711 vlsn,
2712 entry_type
2713 );
2714 Ok(())
2715 }
2716
2717 /// Record an ack from a replica (as master).
2718 ///
2719 /// Records that the specified replica has acknowledged processing up to
2720 /// the given VLSN. This is used by the master to track durability
2721 /// guarantees.
2722 pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
2723 // Only acks from ELECTABLE replicas count toward the durability
2724 // quorum (JE DurabilityQuorum.replicaAcksQualify: Monitors and
2725 // Secondaries do not qualify). An ack from a non-electable / unknown
2726 // node is recorded for stats elsewhere but must not satisfy the
2727 // ReplicaAckPolicy. If the node is unknown to the group view we err
2728 // toward NOT counting it.
2729 let qualifies = self
2730 .get_rep_group()
2731 .get_node(replica_name)
2732 .map(|n| n.node_type().is_electable())
2733 .unwrap_or(false);
2734 if qualifies {
2735 self.ack_tracker.record_ack(vlsn, replica_name);
2736 }
2737 // REP-9 Part 1: advance the matching `Feeder::acked_vlsn` high-water
2738 // mark (read by `update_dtvlsn_from_feeders` and exposed via
2739 // `get_acked_vlsn`). The production `FeederRunner` previously updated
2740 // only its private `known_replica_vlsn`, so the DTVLSN ranking never
2741 // saw production progress (JE `Feeder.getReplicaTxnEndVLSN`). We
2742 // record the high-water for *any* replica (electable or not); the
2743 // electable filter is reapplied when DTVLSN/quorum is computed.
2744 for feeder in self.feeders.read().iter() {
2745 if feeder.get_replica_name() == replica_name {
2746 feeder.record_ack(vlsn);
2747 break;
2748 }
2749 }
2750 // Recompute the DTVLSN from feeder progress whenever an ack lands.
2751 self.update_dtvlsn_from_feeders();
2752 // REP-9: wake any committer parked in `await_replica_acks`. Its
2753 // satisfaction predicate is the high-water feeder count, not an
2754 // exact-VLSN registration, so we must notify unconditionally (the
2755 // AckTracker's own `record_ack` only notifies when the exact VLSN was
2756 // registered, which the per-frame feeder acks generally are not).
2757 self.ack_tracker.notify_waiters();
2758 }
2759
2760 /// Returns the current Durable Transaction VLSN (D7, JE RepNode.getDTVLSN).
2761 /// The highest VLSN replicated to a majority of electable replicas; 0 if
2762 /// none yet. Used by the election ranking so the most-durable node wins.
2763 pub fn get_dtvlsn(&self) -> u64 {
2764 self.dtvlsn.load(std::sync::atomic::Ordering::Acquire)
2765 }
2766
2767 /// Advance the DTVLSN to `candidate` if it is greater (JE
2768 /// RepNode.updateDTVLSN — an `AtomicLongMax.updateMax`). The DTVLSN can
2769 /// only move forward. Returns the resulting (possibly unchanged) value.
2770 pub fn update_dtvlsn(&self, candidate: u64) -> u64 {
2771 use std::sync::atomic::Ordering;
2772 let mut cur = self.dtvlsn.load(Ordering::Acquire);
2773 while candidate > cur {
2774 match self.dtvlsn.compare_exchange_weak(
2775 cur,
2776 candidate,
2777 Ordering::AcqRel,
2778 Ordering::Acquire,
2779 ) {
2780 Ok(_) => return candidate,
2781 Err(observed) => cur = observed,
2782 }
2783 }
2784 cur
2785 }
2786
2787 /// Set the DTVLSN from the replication stream (JE RepNode.setDTVLSN —
2788 /// used exclusively by the replica, which maintains the DTVLSN from
2789 /// commit/abort records). Still enforced as advance-only via update_max so
2790 /// an out-of-order or stale record cannot move it backward.
2791 pub fn set_dtvlsn(&self, vlsn: u64) {
2792 self.update_dtvlsn(vlsn);
2793 }
2794
2795 /// Master-side DTVLSN computation (D7, JE FeederManager.updateDTVLSN):
2796 /// across the *qualifying* (electable) feeders whose replica-txn-end VLSN
2797 /// exceeds the current DTVLSN, take the minimum; once a SIMPLE_MAJORITY
2798 /// ack-count of them exceeds the current value, advance the DTVLSN to that
2799 /// minimum (a transaction is durable once a majority hold it).
2800 fn update_dtvlsn_from_feeders(&self) {
2801 if !self.is_master() {
2802 return;
2803 }
2804 let curr = self.get_dtvlsn();
2805
2806 // SIMPLE_MAJORITY required-ack-count over the electable group,
2807 // computed the same way as await_replica_acks.
2808 let group = self.get_rep_group();
2809 let electable_peers: u32 = group
2810 .get_nodes()
2811 .iter()
2812 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2813 .count() as u32;
2814 let electable_count = electable_peers + 1; // +1 for self/master
2815 // required electable acks for SIMPLE_MAJORITY = floor(n/2) replicas
2816 // (the master self-acks; a majority is reached when this many peers
2817 // also hold the VLSN).
2818 let durable_ack_count = electable_count / 2;
2819 if durable_ack_count == 0 {
2820 // Single-node (or majority is self alone): the master's own log is
2821 // immediately durable up to its latest VLSN.
2822 self.update_dtvlsn(self.get_current_vlsn());
2823 return;
2824 }
2825
2826 let mut min = u64::MAX;
2827 let mut ack_count: u32 = 0;
2828 for feeder in self.feeders.read().iter() {
2829 // replicaAcksQualify: only electable feeders count (D6).
2830 let qualifies = group
2831 .get_node(&feeder.get_replica_name())
2832 .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2833 .unwrap_or(false);
2834 if !qualifies {
2835 continue;
2836 }
2837 let replica_vlsn = feeder.get_acked_vlsn();
2838 if replica_vlsn <= curr {
2839 continue;
2840 }
2841 if replica_vlsn < min {
2842 min = replica_vlsn;
2843 }
2844 ack_count += 1;
2845 if ack_count >= durable_ack_count {
2846 // A majority of electable replicas hold >= min: durable.
2847 self.update_dtvlsn(min);
2848 return;
2849 }
2850 }
2851 // DTVLSN unchanged.
2852 }
2853
2854 /// REP-9: count qualifying (electable) feeders whose acked high-water VLSN
2855 /// is `>= commit_vlsn`. This is the Rust equivalent of JE
2856 /// `FeederManager.getNumCurrentAckFeeders(commitVLSN)` — the durability
2857 /// quorum is satisfied when this count reaches the required ack count.
2858 /// Only Electable replicas qualify (D6, JE
2859 /// `DurabilityQuorum.replicaAcksQualify`).
2860 fn count_ack_feeders_ge(&self, commit_vlsn: u64) -> u32 {
2861 let group = self.get_rep_group();
2862 let mut count = 0u32;
2863 for feeder in self.feeders.read().iter() {
2864 let qualifies = group
2865 .get_node(&feeder.get_replica_name())
2866 .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2867 .unwrap_or(false);
2868 // A feeder counts only if it has acked a *real* VLSN at or above
2869 // the commit VLSN. `acked_vlsn == 0` is the NULL sentinel (no ack
2870 // yet) and must never satisfy a commit, even when `commit_vlsn`
2871 // itself is 0 (no replicated commit logged) — mirrors JE
2872 // `getReplicaTxnEndVLSN()` returning NULL_VLSN for a fresh feeder,
2873 // which is not `>=` any commit VLSN.
2874 let acked = feeder.get_acked_vlsn();
2875 if qualifies && acked > 0 && acked >= commit_vlsn {
2876 count += 1;
2877 }
2878 }
2879 count
2880 }
2881
2882 /// Set the state change listener.
2883 ///
2884 ///
2885 ///
2886 /// Sets the listener used to receive asynchronous replication node state
2887 /// change events. Note that there is one listener per replication node,
2888 /// not one per handle. Invoking this method adds to the set of listeners.
2889 ///
2890 /// Invoking this method typically results in an immediate callback to the
2891 /// application via the `on_state_change` method, so that the application
2892 /// is made aware of the existing state of the node at the time the listener
2893 /// is first established.
2894 pub fn set_state_change_listener(
2895 &self,
2896 listener: Arc<dyn StateChangeListener>,
2897 ) {
2898 // Immediately notify the listener of the current state
2899 let current_state = self.node_state.get_state();
2900 let event = StateChangeEvent::new(
2901 current_state,
2902 current_state,
2903 self.get_master_name(),
2904 );
2905 listener.on_state_change(event);
2906
2907 let mut listeners = self.listeners.write();
2908 listeners.push(listener);
2909 }
2910
2911 /// Close the replicated environment.
2912 ///
2913 ///
2914 ///
2915 /// Closes this handle and releases any resources. When closed, daemon
2916 /// threads are stopped, even if they are performing work. The node ceases
2917 /// participation in the replication group. If the node was currently the
2918 /// master, the rest of the group will hold an election.
2919 ///
2920 /// The ReplicatedEnvironment should not be closed while any other type of
2921 /// handle that refers to it is not yet closed.
2922 pub fn close(&self) -> Result<()> {
2923 if self.shutdown.swap(true, Ordering::SeqCst) {
2924 // Already closed
2925 return Ok(());
2926 }
2927
2928 let old_state = self.node_state.get_state();
2929
2930 // Transition to Shutdown state. The state machine allows this from
2931 // any non-Shutdown state.
2932 let _ = self.node_state.transition_to(NodeState::Shutdown);
2933
2934 // Notify listeners of the shutdown
2935 self.notify_listeners(old_state, NodeState::Shutdown);
2936
2937 // Clear feeders
2938 {
2939 let mut feeders = self.feeders.write();
2940 feeders.clear();
2941 }
2942
2943 // C-C2: close all registered feeder channels so FeederRunner threads
2944 // observe ChannelClosed and exit their run() loops cleanly.
2945 {
2946 let channels = self.feeder_channels.lock().unwrap();
2947 for (name, ch) in channels.iter() {
2948 if let Err(e) = ch.close() {
2949 log::debug!(
2950 "close: feeder channel for '{}' already closed: {}",
2951 name,
2952 e
2953 );
2954 }
2955 }
2956 }
2957 // Drop all active runners and queues so their Arcs release.
2958 self.active_feeder_runners.lock().unwrap().clear();
2959 self.feeder_queues.write().unwrap().clear();
2960
2961 // Signal and join all I/O threads spawned by become_master /
2962 // become_replica / start_vlsn_persistence_daemon. The vlsn-flush
2963 // thread does a final flush on its way out so a clean close is
2964 // recoverable. Closes finding F11.
2965 self.io_shutdown.store(true, Ordering::SeqCst);
2966 {
2967 let mut threads = self.io_threads.lock().unwrap();
2968 for handle in threads.drain(..) {
2969 let _ = handle.join();
2970 }
2971 }
2972
2973 // Belt-and-braces: even when no daemon is running (e.g.
2974 // `ReplicatedEnvironment::new` without `open`), persist a final
2975 // snapshot if env_home is configured.
2976 if let Some(ref home) = self.config.env_home
2977 && let Err(e) =
2978 crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
2979 {
2980 log::warn!(
2981 "close: failed to persist VLSN index to {}: {}",
2982 home.display(),
2983 e
2984 );
2985 }
2986
2987 // Stop the service dispatcher (the: serviceDispatcher.shutdown()).
2988 if let Some(ref dispatcher) = self.tcp_dispatcher {
2989 dispatcher.stop();
2990 let kind = if dispatcher.is_tls() { "TLS" } else { "TCP" };
2991 log::debug!(
2992 "Node '{}' {} service dispatcher stopped",
2993 self.config.node_name.as_str(),
2994 kind,
2995 );
2996 }
2997
2998 log::info!(
2999 "Replicated environment '{}' in group '{}' closed",
3000 self.config.node_name.as_str(),
3001 self.config.group_name.as_str()
3002 );
3003
3004 Ok(())
3005 }
3006
3007 /// Close this handle and shut down the Replication Group by forcing all
3008 /// active Replicas to exit.
3009 ///
3010 ///
3011 ///
3012 /// This method must be invoked on the node that's currently the Master
3013 /// after all other outstanding handles have been closed.
3014 ///
3015 /// When push-feeder threads are active (registered via
3016 /// [`Self::register_feeder_channel`]), the master first waits up to half
3017 /// of `replica_shutdown_timeout_ms` for each FeederRunner replica to
3018 /// acknowledge all outstanding log entries (VLSN catch-up). Replicas
3019 /// that do not catch up within the budget receive a warning; the master
3020 /// proceeds to send `SHUTDOWN_GROUP` regardless. This closes finding M-4
3021 /// of the v3.x production-readiness review.
3022 ///
3023 /// Replicas that are not fed via a registered channel (pull-based
3024 /// `PeerFeederService` path) are sent `SHUTDOWN_GROUP` without a
3025 /// VLSN-level catch-up wait — that wait requires per-replica ack tracking
3026 /// which the pull path does not yet provide.
3027 pub fn shutdown_group(
3028 &self,
3029 replica_shutdown_timeout_ms: u64,
3030 ) -> Result<()> {
3031 if !self.is_master() {
3032 return Err(RepError::InvalidState(
3033 "shutdownGroup must be invoked on the master".to_string(),
3034 ));
3035 }
3036
3037 log::info!(
3038 "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
3039 self.config.node_name.as_str(),
3040 self.config.group_name.as_str(),
3041 replica_shutdown_timeout_ms,
3042 );
3043
3044 // M-4: Wait for active FeederRunner replicas to ack the master's
3045 // current VLSN before sending SHUTDOWN_GROUP. We allow up to half
3046 // the overall timeout for the catch-up phase so the second half
3047 // remains for the SHUTDOWN_GROUP send loop.
3048 let catchup_budget_ms = replica_shutdown_timeout_ms / 2;
3049 if catchup_budget_ms > 0 {
3050 let master_vlsn = self.vlsn_index.get_range().last();
3051 if master_vlsn > 0 {
3052 let runners: Vec<(String, Arc<FeederRunner>)> = self
3053 .active_feeder_runners
3054 .lock()
3055 .unwrap()
3056 .iter()
3057 .map(|(k, v)| (k.clone(), Arc::clone(v)))
3058 .collect();
3059 if !runners.is_empty() {
3060 let catchup_deadline = std::time::Instant::now()
3061 + Duration::from_millis(catchup_budget_ms);
3062 for (name, runner) in &runners {
3063 loop {
3064 let acked = runner.known_replica_vlsn();
3065 if acked >= master_vlsn
3066 || std::time::Instant::now() >= catchup_deadline
3067 {
3068 if acked < master_vlsn {
3069 log::warn!(
3070 "shutdown_group: replica '{}' acked \
3071 VLSN {} < master VLSN {}; proceeding",
3072 name,
3073 acked,
3074 master_vlsn,
3075 );
3076 } else {
3077 log::info!(
3078 "shutdown_group: replica '{}' caught up \
3079 to VLSN {}",
3080 name,
3081 acked,
3082 );
3083 }
3084 break;
3085 }
3086 std::thread::sleep(Duration::from_millis(10));
3087 }
3088 }
3089 }
3090 }
3091 }
3092
3093 // Closes finding F8 of the 2026 review.
3094 //
3095 // Send SHUTDOWN_GROUP to every known peer. The recipient calls
3096 // its own `close()` and the per-connection ADMIN handler
3097 // returns ACK_OK. Any peer that doesn't ack within the
3098 // timeout is logged and the master proceeds. After signalling
3099 // every peer, the master closes its own env.
3100 let deadline = std::time::Instant::now()
3101 + Duration::from_millis(replica_shutdown_timeout_ms);
3102
3103 for peer in self.group_service.get_all_nodes() {
3104 if peer.name == self.config.node_name {
3105 continue;
3106 }
3107 // Don't exceed the deadline waiting for any single peer.
3108 let now = std::time::Instant::now();
3109 if now >= deadline {
3110 log::warn!(
3111 "shutdown_group: deadline reached; skipping remaining peers"
3112 );
3113 break;
3114 }
3115 let addr_str = format!("{}:{}", peer.host, peer.port);
3116 let addr = match addr_str.parse::<SocketAddr>() {
3117 Ok(a) => a,
3118 Err(e) => {
3119 log::warn!(
3120 "shutdown_group: peer '{}' has bad address {}: {}",
3121 peer.name,
3122 addr_str,
3123 e
3124 );
3125 continue;
3126 }
3127 };
3128 match crate::group_admin::send_shutdown_group(addr) {
3129 Ok(true) => log::info!(
3130 "shutdown_group: peer '{}' acknowledged",
3131 peer.name
3132 ),
3133 Ok(false) => log::warn!(
3134 "shutdown_group: peer '{}' rejected the request",
3135 peer.name
3136 ),
3137 Err(e) => log::warn!(
3138 "shutdown_group: peer '{}' unreachable: {}",
3139 peer.name,
3140 e
3141 ),
3142 }
3143 }
3144
3145 // Master closes itself last.
3146 self.close()
3147 }
3148
3149 /// Check if shutdown is in progress.
3150 pub fn is_shutdown(&self) -> bool {
3151 self.shutdown.load(Ordering::SeqCst)
3152 }
3153
3154 /// Notify all registered listeners of a state change.
3155 fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
3156 let listeners = self.listeners.read();
3157 if !listeners.is_empty() {
3158 let event = StateChangeEvent::new(
3159 old_state,
3160 new_state,
3161 self.get_master_name(),
3162 );
3163 for listener in listeners.iter() {
3164 listener.on_state_change(event.clone());
3165 }
3166 }
3167 }
3168}
3169
3170// ---------------------------------------------------------------------------
3171// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
3172// ---------------------------------------------------------------------------
3173//
3174// `noxu_db::Transaction::commit_with_durability` calls
3175// `await_replica_acks` after the local WAL fsync. This impl:
3176//
3177// 1. Rejects calls on a non-master node with `NotMaster`.
3178// 2. Rejects calls during shutdown with `Shutdown`.
3179// 3. Computes the required ack count from `electable_count` and the
3180// requested policy.
3181// 4. Allocates a unique commit sequence number, registers the ack
3182// requirement on the `AckTracker`, and polls `is_satisfied` with
3183// a small sleep until either the timeout elapses or the policy
3184// is satisfied.
3185// 5. Cleans up the tracker entry on every exit path.
3186//
3187// Closes finding F1 of the 2026 review.
3188impl ReplicaAckCoordinator for ReplicatedEnvironment {
3189 fn await_replica_acks(
3190 &self,
3191 policy: ReplicaAckPolicyKind,
3192 timeout: Duration,
3193 ) -> std::result::Result<u32, AckWaitError> {
3194 // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
3195 // says callers may already short-circuit, but be defensive.
3196 if matches!(policy, ReplicaAckPolicyKind::None) {
3197 return Ok(0);
3198 }
3199
3200 if self.is_shutdown() {
3201 return Err(AckWaitError {
3202 kind: AckWaitErrorKind::Shutdown,
3203 needed: 0,
3204 received: 0,
3205 });
3206 }
3207
3208 if !self.is_master() {
3209 return Err(AckWaitError {
3210 kind: AckWaitErrorKind::NotMaster,
3211 needed: 0,
3212 received: 0,
3213 });
3214 }
3215
3216 // Count electable peers (excluding the master) using the
3217 // RepGroup view, which counts Arbiters and Electables
3218 // identically. Only Electable nodes are counted as data
3219 // replicas able to ack a commit. The master itself is
3220 // *implicit*: it is not registered in `group_service` (only
3221 // peers are), so we add 1 to obtain the total electable
3222 // count expected by `ReplicaAckPolicyKind::required_acks`.
3223 let group = self.get_rep_group();
3224 let electable_peers: u32 = group
3225 .get_nodes()
3226 .iter()
3227 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
3228 .count() as u32;
3229 let electable_count: u32 = electable_peers + 1; // +1 for self/master
3230
3231 let needed = policy.required_acks(electable_count);
3232 if needed == 0 {
3233 // Single-node group, or All with only the master itself.
3234 return Ok(0);
3235 }
3236
3237 // REP-9 Part 2: the commit's VLSN is the key. The master assigns a
3238 // VLSN when it logs the TxnCommit (via the shared `wal_vlsn_counter`
3239 // bumped in `EnvironmentImpl::log_txn_commit`), immediately before
3240 // this gate runs. The latest assigned VLSN therefore IS this
3241 // commit's VLSN (the trait contract: "implementations are responsible
3242 // for assigning the commit VLSN internally"). We wait until a quorum
3243 // of qualifying electable replicas have acked a VLSN >= the commit
3244 // VLSN — faithful to JE `FeederManager.getNumCurrentAckFeeders`, which
3245 // counts feeders whose `getReplicaTxnEndVLSN() >= commitVLSN` (a
3246 // high-water `>=` test, NOT an exact-VLSN match).
3247 //
3248 // ponytail: reads the global high-water VLSN, so a concurrent later
3249 // commit can make this gate wait on a slightly higher VLSN than its
3250 // own. That is strictly SAFE (waiting for >= a newer VLSN never
3251 // returns early) and only marginally less precise; thread the
3252 // per-txn VLSN through the trait if exact per-commit granularity is
3253 // ever needed.
3254 let commit_vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
3255
3256 // Register on the AckTracker too: this is what `record_ack` notifies,
3257 // so the condvar wakes us as acks land. The satisfaction decision
3258 // itself is the high-water feeder count below.
3259 self.ack_tracker.register(commit_vlsn, needed);
3260
3261 // Block on the ack condvar until a quorum of electable feeders hold
3262 // the commit VLSN, the timeout elapses, or shutdown is signalled — no
3263 // spin-poll (JE FeederTxns.TxnInfo uses a per-transaction
3264 // CountDownLatch.await; the AckTracker condvar is the shared-mutex
3265 // equivalent). record_ack notifies us as acks arrive.
3266 let satisfied = self.ack_tracker.wait_for_predicate(
3267 timeout,
3268 || self.count_ack_feeders_ge(commit_vlsn) >= needed,
3269 || self.is_shutdown(),
3270 );
3271 if satisfied {
3272 self.ack_tracker.cleanup_through(commit_vlsn);
3273 return Ok(needed);
3274 }
3275 if self.is_shutdown() {
3276 self.ack_tracker.cleanup_through(commit_vlsn);
3277 return Err(AckWaitError {
3278 kind: AckWaitErrorKind::Shutdown,
3279 needed,
3280 received: 0,
3281 });
3282 }
3283 // Timed out: report the partial ack count (qualifying electable
3284 // feeders holding the commit VLSN) so the caller can surface
3285 // InsufficientReplicas.
3286 let received = self.count_ack_feeders_ge(commit_vlsn);
3287 self.ack_tracker.cleanup_through(commit_vlsn);
3288 Err(AckWaitError { kind: AckWaitErrorKind::Timeout, needed, received })
3289 }
3290
3291 /// X-3: allocate the next VLSN for a recovered XA commit and register
3292 /// `lsn` in the VLSN index so feeders can stream the commit.
3293 ///
3294 /// Increments off the current latest VLSN so the new VLSN is strictly
3295 /// monotonically increasing. In a single-node or master-less environment
3296 /// (not master) returns 0 (NULL_VLSN — harmless, the default).
3297 fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
3298 // Only allocate a VLSN when we are the master; on a replica the
3299 // recovered XA should have been replicated by the original master.
3300 if !self.is_master() {
3301 return 0;
3302 }
3303 let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
3304 // A recovered XA commit is a commit log entry; dispatch as TxnCommit
3305 // so lastTxnEnd/lastSync advance (REP-5).
3306 self.vlsn_index.register_with_type(
3307 next_vlsn,
3308 lsn.file_number(),
3309 lsn.file_offset(),
3310 noxu_log::LogEntryType::TxnCommit,
3311 );
3312 log::debug!(
3313 "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
3314 next_vlsn,
3315 lsn
3316 );
3317 next_vlsn
3318 }
3319
3320 /// R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.
3321 ///
3322 /// The caller writes the `TxnCommit` WAL entry with this VLSN embedded,
3323 /// then calls `register_recovered_commit_vlsn` with the actual commit LSN.
3324 /// This two-step approach ensures the WAL entry carries the VLSN so the
3325 /// X-14 VLSN rebuild on second crash can find it.
3326 fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
3327 if !self.is_master() {
3328 return 0;
3329 }
3330 // Peek at the next VLSN without registering. The actual registration
3331 // happens in register_recovered_commit_vlsn() after the WAL write.
3332 self.vlsn_index.get_latest_vlsn() + 1
3333 }
3334
3335 /// R-3: register a pre-allocated VLSN in the VLSN index with the actual
3336 /// commit LSN. Called after writing the `TxnCommit` WAL entry.
3337 fn register_recovered_commit_vlsn(
3338 &self,
3339 vlsn: u64,
3340 commit_lsn: noxu_util::Lsn,
3341 ) {
3342 if vlsn == 0 || !self.is_master() {
3343 return;
3344 }
3345 // The pre-allocated VLSN is for a TxnCommit WAL entry; dispatch the
3346 // type so lastTxnEnd/lastSync advance (REP-5).
3347 self.vlsn_index.register_with_type(
3348 vlsn,
3349 commit_lsn.file_number(),
3350 commit_lsn.file_offset(),
3351 noxu_log::LogEntryType::TxnCommit,
3352 );
3353 log::debug!(
3354 "register_recovered_commit_vlsn: registered vlsn={} for commit_lsn={:?}",
3355 vlsn,
3356 commit_lsn
3357 );
3358 }
3359}
3360
3361#[cfg(test)]
3362mod tests {
3363 use super::*;
3364 use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
3365
3366 /// Helper to create a test config with a fixed port (unit-test style,
3367 /// no real TCP bind needed — hostname "localhost" resolves but the port
3368 /// might be in use; use `test_config_port0` for real TCP tests).
3369 fn test_config(node_name: &str) -> RepConfig {
3370 RepConfig::builder("test_group", node_name, "localhost")
3371 .node_port(5001)
3372 .build()
3373 }
3374
3375 /// Helper to create a test config that binds to an OS-assigned port.
3376 fn test_config_port0(node_name: &str) -> RepConfig {
3377 RepConfig::builder("test_group", node_name, "127.0.0.1")
3378 .node_port(0)
3379 .build()
3380 }
3381
3382 #[test]
3383 fn test_initial_state_is_detached() {
3384 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3385 // NodeStateMachine starts in Detached state
3386 assert_eq!(env.get_state(), NodeState::Detached);
3387 assert!(!env.is_master());
3388 assert!(!env.is_replica());
3389 assert!(!env.is_active());
3390 }
3391
3392 #[test]
3393 fn test_become_master() {
3394 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3395 env.become_master(1).unwrap();
3396 assert_eq!(env.get_state(), NodeState::Master);
3397 assert!(env.is_master());
3398 assert!(!env.is_replica());
3399 assert!(env.is_active());
3400 }
3401
3402 #[test]
3403 fn test_become_replica() {
3404 let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
3405 env.become_replica("node1").unwrap();
3406 assert_eq!(env.get_state(), NodeState::Replica);
3407 assert!(!env.is_master());
3408 assert!(env.is_replica());
3409 assert!(env.is_active());
3410 }
3411
3412 #[test]
3413 fn test_get_node_name() {
3414 let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
3415 assert_eq!(env.get_node_name(), "my_node");
3416 }
3417
3418 #[test]
3419 fn test_get_group_name() {
3420 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3421 assert_eq!(env.get_group_name(), "test_group");
3422 }
3423
3424 #[test]
3425 fn test_register_vlsn_updates_index() {
3426 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3427 env.register_vlsn(1, 0, 100);
3428 env.register_vlsn(2, 0, 200);
3429 env.register_vlsn(3, 0, 300);
3430
3431 assert_eq!(env.get_current_vlsn(), 3);
3432 let range = env.get_vlsn_range();
3433 assert_eq!(range.first(), 1);
3434 assert_eq!(range.last(), 3);
3435 }
3436
3437 #[test]
3438 fn test_record_ack() {
3439 use crate::node_type::NodeType;
3440 use crate::rep_node::RepNode;
3441 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3442 env.become_master(1).unwrap();
3443 // replicaAcksQualify: only ELECTABLE replicas count toward durability,
3444 // so the replica must be a known electable member of the group.
3445 env.add_peer(RepNode::new(
3446 "replica1".to_string(),
3447 NodeType::Electable,
3448 "127.0.0.1".to_string(),
3449 6001,
3450 2,
3451 ))
3452 .unwrap();
3453
3454 env.register_vlsn(1, 0, 100);
3455 // Register a pending ack requirement, then record ack
3456 env.get_ack_tracker().register(1, 1);
3457 env.record_ack(1, "replica1");
3458 // Ack should be satisfied
3459 assert!(env.get_ack_tracker().is_satisfied(1));
3460 }
3461
3462 #[test]
3463 fn test_record_ack_from_non_electable_does_not_qualify() {
3464 use crate::node_type::NodeType;
3465 use crate::rep_node::RepNode;
3466 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3467 env.become_master(1).unwrap();
3468 // A Monitor is NOT electable -> its ack must not count (JE
3469 // DurabilityQuorum.replicaAcksQualify).
3470 env.add_peer(RepNode::new(
3471 "monitor1".to_string(),
3472 NodeType::Monitor,
3473 "127.0.0.1".to_string(),
3474 6002,
3475 3,
3476 ))
3477 .unwrap();
3478 env.register_vlsn(1, 0, 100);
3479 env.get_ack_tracker().register(1, 1);
3480 env.record_ack(1, "monitor1");
3481 assert!(
3482 !env.get_ack_tracker().is_satisfied(1),
3483 "non-electable ack must not satisfy durability quorum"
3484 );
3485 // An unknown replica likewise does not qualify.
3486 env.record_ack(1, "ghost");
3487 assert!(!env.get_ack_tracker().is_satisfied(1));
3488 }
3489
3490 #[test]
3491 fn test_authoritative_quorum_met() {
3492 // 1-node group (electable_total=1): master alone IS authoritative
3493 // (quorum_size = 1/2+1 = 1; 0 replicas + 1 >= 1).
3494 assert!(ReplicatedEnvironment::authoritative_quorum_met(0, 1));
3495 // 3-node group (electable_total=3, quorum_size=2): master with 0
3496 // connected replicas is the minority -> NOT authoritative.
3497 assert!(!ReplicatedEnvironment::authoritative_quorum_met(0, 3));
3498 // 3-node group with 1 connected electable replica -> 1+1=2 >= 2 -> yes.
3499 assert!(ReplicatedEnvironment::authoritative_quorum_met(1, 3));
3500 // 5-node group (quorum_size=3): need 2 connected replicas.
3501 assert!(!ReplicatedEnvironment::authoritative_quorum_met(1, 5));
3502 assert!(ReplicatedEnvironment::authoritative_quorum_met(2, 5));
3503 }
3504
3505 #[test]
3506 fn test_is_authoritative_master_requires_master_role() {
3507 // A non-master is never authoritative regardless of connections.
3508 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3509 assert!(!env.is_master());
3510 assert!(!env.is_authoritative_master());
3511 // A single-node master (no peers) IS authoritative.
3512 env.become_master(1).unwrap();
3513 assert!(env.is_authoritative_master());
3514 }
3515
3516 #[test]
3517 fn test_dtvlsn_update_max_advances_only() {
3518 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3519 assert_eq!(env.get_dtvlsn(), 0);
3520 assert_eq!(env.update_dtvlsn(10), 10);
3521 assert_eq!(env.get_dtvlsn(), 10);
3522 // A lower candidate must not move it backward.
3523 assert_eq!(env.update_dtvlsn(5), 10);
3524 assert_eq!(env.get_dtvlsn(), 10);
3525 // Equal is a no-op.
3526 assert_eq!(env.update_dtvlsn(10), 10);
3527 // set_dtvlsn (replica path) is also advance-only.
3528 env.set_dtvlsn(7);
3529 assert_eq!(env.get_dtvlsn(), 10);
3530 env.set_dtvlsn(20);
3531 assert_eq!(env.get_dtvlsn(), 20);
3532 }
3533
3534 #[test]
3535 fn test_dtvlsn_majority_min_across_feeders() {
3536 use crate::node_type::NodeType;
3537 use crate::rep_node::RepNode;
3538 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3539 env.become_master(1).unwrap();
3540 // Three electable replicas → electable_count = 4 (incl. master) →
3541 // durable_ack_count = 2. With master self-ack, DTVLSN advances to the
3542 // min of the 2 highest qualifying feeders that exceed the current
3543 // DTVLSN.
3544 for (i, name) in ["r1", "r2", "r3"].iter().enumerate() {
3545 env.add_peer(RepNode::new(
3546 name.to_string(),
3547 NodeType::Electable,
3548 "127.0.0.1".to_string(),
3549 6100 + i as u16,
3550 (i + 2) as u32,
3551 ))
3552 .unwrap();
3553 }
3554 // Register feeders with differing acked VLSNs: r1=100, r2=80, r3=50.
3555 for (name, vlsn) in [("r1", 100u64), ("r2", 80), ("r3", 50)] {
3556 let f = crate::stream::feeder::Feeder::new(name.to_string());
3557 f.record_ack(vlsn);
3558 env.feeders.write().push(f);
3559 }
3560 env.update_dtvlsn_from_feeders();
3561 // First two qualifying feeders encountered are r1(100), r2(80);
3562 // min(100,80)=80 and that is a majority (2 of 4) → DTVLSN = 80.
3563 // (r3=50 < 80 is not required for durability.)
3564 assert!(
3565 env.get_dtvlsn() >= 80,
3566 "DTVLSN must reach the majority-min (>=80), got {}",
3567 env.get_dtvlsn()
3568 );
3569 }
3570
3571 #[test]
3572 fn test_close_sets_shutdown() {
3573 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3574 assert!(!env.is_shutdown());
3575
3576 env.close().unwrap();
3577 assert!(env.is_shutdown());
3578 // After close, state should be Shutdown
3579 assert_eq!(env.get_state(), NodeState::Shutdown);
3580 }
3581
3582 #[test]
3583 fn test_close_is_idempotent() {
3584 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3585 env.close().unwrap();
3586 env.close().unwrap(); // Should not error
3587 assert!(env.is_shutdown());
3588 }
3589
3590 #[test]
3591 fn test_cannot_become_master_when_shutdown() {
3592 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3593 env.close().unwrap();
3594
3595 let result = env.become_master(1);
3596 assert!(result.is_err());
3597 }
3598
3599 #[test]
3600 fn test_cannot_become_replica_when_shutdown() {
3601 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3602 env.close().unwrap();
3603
3604 let result = env.become_replica("master");
3605 assert!(result.is_err());
3606 }
3607
3608 #[test]
3609 fn test_cannot_apply_entry_when_shutdown() {
3610 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3611 env.close().unwrap();
3612
3613 let result = env.apply_entry(1, 0, vec![1, 2, 3]);
3614 assert!(result.is_err());
3615 }
3616
3617 #[test]
3618 fn test_cannot_transfer_master_when_not_master() {
3619 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3620 env.become_replica("other").unwrap();
3621
3622 let config = MasterTransferConfig::new(
3623 "target_node".to_string(),
3624 Duration::from_secs(30),
3625 );
3626 let result = env.transfer_master(config);
3627 assert!(result.is_err());
3628 }
3629
3630 #[test]
3631 fn test_transfer_master_requires_registered_target() {
3632 // F7: transfer_master is no longer a no-op; it sends an ADMIN
3633 // TRANSFER_MASTER signal to the target via TCP. An unregistered
3634 // target is rejected at the address-resolution step.
3635 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3636 env.become_master(1).unwrap();
3637
3638 let config = MasterTransferConfig::new(
3639 "unknown_target".to_string(),
3640 Duration::from_secs(30),
3641 );
3642 let result = env.transfer_master(config);
3643 assert!(
3644 result.is_err(),
3645 "transfer_master to unregistered target must error"
3646 );
3647 }
3648
3649 #[test]
3650 fn test_apply_entry_registers_vlsn() {
3651 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3652 env.become_replica("master").unwrap();
3653
3654 env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
3655 env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
3656
3657 assert_eq!(env.get_current_vlsn(), 2);
3658 }
3659
3660 #[test]
3661 fn test_master_name_tracking() {
3662 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3663
3664 // Initially no master known
3665 assert!(env.get_master_name().is_none());
3666
3667 // After becoming master, this node is the master
3668 env.become_master(1).unwrap();
3669 assert_eq!(env.get_master_name(), Some("node1".to_string()));
3670 }
3671
3672 #[test]
3673 fn test_master_to_replica_transition() {
3674 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3675
3676 // Become master first
3677 env.become_master(1).unwrap();
3678 assert_eq!(env.get_master_name(), Some("node1".to_string()));
3679
3680 // Transition to replica (Master -> Replica is valid)
3681 env.become_replica("other_master").unwrap();
3682 assert_eq!(env.get_master_name(), Some("other_master".to_string()));
3683 assert!(env.is_replica());
3684 }
3685
3686 #[test]
3687 fn test_state_change_listener_notification() {
3688 struct TestListener {
3689 call_count: AtomicU32,
3690 last_new_state: noxu_sync::Mutex<Option<NodeState>>,
3691 }
3692
3693 impl StateChangeListener for TestListener {
3694 fn on_state_change(&self, event: StateChangeEvent) {
3695 self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
3696 *self.last_new_state.lock() = Some(event.new_state);
3697 }
3698 }
3699
3700 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3701 let listener = Arc::new(TestListener {
3702 call_count: AtomicU32::new(0),
3703 last_new_state: noxu_sync::Mutex::new(None),
3704 });
3705
3706 // Setting the listener should trigger an immediate notification
3707 env.set_state_change_listener(listener.clone());
3708 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
3709
3710 // State change should trigger another notification
3711 env.become_master(1).unwrap();
3712 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
3713 assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
3714 }
3715
3716 #[test]
3717 fn test_close_notifies_listeners() {
3718 struct ShutdownListener {
3719 shutdown_seen: AtomicBool,
3720 }
3721
3722 impl StateChangeListener for ShutdownListener {
3723 fn on_state_change(&self, event: StateChangeEvent) {
3724 if event.new_state == NodeState::Shutdown {
3725 self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
3726 }
3727 }
3728 }
3729
3730 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3731 let listener = Arc::new(ShutdownListener {
3732 shutdown_seen: AtomicBool::new(false),
3733 });
3734
3735 // The initial notification is for the current (Detached) state
3736 env.set_state_change_listener(listener.clone());
3737
3738 // Become master first so the close transition is meaningful
3739 env.become_master(1).unwrap();
3740 assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3741
3742 env.close().unwrap();
3743 assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3744 }
3745
3746 #[test]
3747 fn test_shutdown_group_requires_master() {
3748 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3749 env.become_replica("other").unwrap();
3750
3751 let result = env.shutdown_group(5000);
3752 assert!(result.is_err());
3753 }
3754
3755 #[test]
3756 fn test_shutdown_group_as_master() {
3757 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3758 env.become_master(1).unwrap();
3759
3760 let result = env.shutdown_group(5000);
3761 assert!(result.is_ok());
3762 assert!(env.is_shutdown());
3763 }
3764
3765 #[test]
3766 fn test_get_config() {
3767 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3768 assert_eq!(env.get_config().node_name, "node1");
3769 assert_eq!(env.get_config().group_name, "test_group");
3770 }
3771
3772 #[test]
3773 fn test_get_stats() {
3774 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3775 let _stats = env.get_stats();
3776 // Just verify we can access stats without panicking
3777 }
3778
3779 // -----------------------------------------------------------------------
3780 // TCP dispatcher tests (H-5 / H-7)
3781 // -----------------------------------------------------------------------
3782
3783 #[test]
3784 fn test_tcp_dispatcher_starts_on_new() {
3785 // Use port 0 so the OS assigns an ephemeral port.
3786 let env =
3787 ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
3788 // The dispatcher must have started and bound a real port.
3789 let addr = env.bound_addr();
3790 assert!(addr.is_some(), "expected a bound address");
3791 let addr = addr.unwrap();
3792 assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
3793 }
3794
3795 #[test]
3796 fn test_tcp_dispatcher_stops_on_close() {
3797 let env =
3798 ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
3799 // Dispatcher is running.
3800 assert!(
3801 env.tcp_dispatcher
3802 .as_ref()
3803 .map(|d| d.is_running())
3804 .unwrap_or(false)
3805 );
3806
3807 env.close().unwrap();
3808
3809 // After close, dispatcher must be stopped.
3810 assert!(
3811 !env.tcp_dispatcher
3812 .as_ref()
3813 .map(|d| d.is_running())
3814 .unwrap_or(false),
3815 "dispatcher should be stopped after close"
3816 );
3817 }
3818
3819 #[test]
3820 fn test_tcp_dispatcher_accepts_connection() {
3821 use crate::net::Channel;
3822 use crate::net::ServiceHandler;
3823 use crate::net::service_dispatcher::connect_to_service;
3824 use std::sync::atomic::{AtomicU32, Ordering as AO};
3825 use std::time::Duration;
3826
3827 struct PingHandler {
3828 count: AtomicU32,
3829 }
3830 impl ServiceHandler for PingHandler {
3831 fn service_name(&self) -> &str {
3832 "ping"
3833 }
3834 fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
3835 self.count.fetch_add(1, AO::SeqCst);
3836 // Echo the first message back.
3837 if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
3838 let _ = ch.send(&msg);
3839 }
3840 Ok(())
3841 }
3842 }
3843
3844 let env =
3845 ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
3846 let addr = env.bound_addr().expect("dispatcher must be bound");
3847
3848 // Register a ping handler on the running dispatcher.
3849 if let Some(ref disp) = env.tcp_dispatcher {
3850 let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
3851 disp.register("ping", handler.clone());
3852
3853 // Give the accept thread a moment.
3854 std::thread::sleep(Duration::from_millis(20));
3855
3856 let client = connect_to_service(addr, "ping").unwrap();
3857 client.send(b"hello").unwrap();
3858 let reply = client.receive(Duration::from_secs(2)).unwrap();
3859 assert_eq!(reply, Some(b"hello".to_vec()));
3860
3861 assert_eq!(handler.count.load(AO::SeqCst), 1);
3862 }
3863
3864 env.close().unwrap();
3865 }
3866
3867 #[test]
3868 fn test_become_master_auto_transitions_from_detached() {
3869 // The state machine requires Detached -> Unknown -> Master.
3870 // become_master() should handle this automatically.
3871 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3872 assert_eq!(env.get_state(), NodeState::Detached);
3873 env.become_master(1).unwrap();
3874 assert_eq!(env.get_state(), NodeState::Master);
3875 }
3876
3877 #[test]
3878 fn test_become_replica_auto_transitions_from_detached() {
3879 // The state machine requires Detached -> Unknown -> Replica.
3880 // become_replica() should handle this automatically.
3881 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3882 assert_eq!(env.get_state(), NodeState::Detached);
3883 env.become_replica("master_node").unwrap();
3884 assert_eq!(env.get_state(), NodeState::Replica);
3885 }
3886
3887 #[test]
3888 fn test_cannot_transfer_master_when_shutdown() {
3889 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3890 env.become_master(1).unwrap();
3891 env.close().unwrap();
3892
3893 let config = MasterTransferConfig::new(
3894 "target".to_string(),
3895 Duration::from_secs(30),
3896 );
3897 let result = env.transfer_master(config);
3898 assert!(result.is_err());
3899 }
3900
3901 #[test]
3902 fn test_full_lifecycle() {
3903 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3904
3905 // Start as detached
3906 assert_eq!(env.get_state(), NodeState::Detached);
3907
3908 // Become master
3909 env.become_master(1).unwrap();
3910 assert!(env.is_master());
3911
3912 // Register some VLSNs
3913 env.register_vlsn(1, 0, 100);
3914 env.register_vlsn(2, 0, 200);
3915
3916 // Record ack from replica
3917 env.record_ack(1, "replica1");
3918 env.record_ack(2, "replica1");
3919
3920 // Transition to replica (simulating failover)
3921 env.become_replica("node2").unwrap();
3922 assert!(env.is_replica());
3923
3924 // Apply entries from new master
3925 env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
3926
3927 // Close
3928 env.close().unwrap();
3929 assert!(env.is_shutdown());
3930 }
3931
3932 /// Verify that `with_environment` lazily registers the RESTORE service on
3933 /// the TCP dispatcher when `config.env_home` was not set at construction.
3934 ///
3935 /// This mirrors`RepNode.envSetup()` which registers the restore handler
3936 /// when the environment is wired into the replicated node.
3937 #[test]
3938 fn test_restore_registered_lazily_via_with_environment() {
3939 use noxu_dbi::EnvironmentImpl;
3940 use tempfile::TempDir;
3941
3942 let dir = TempDir::new().expect("temp dir");
3943
3944 // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
3945 let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
3946 .node_port(0)
3947 .build();
3948
3949 let rep_env = ReplicatedEnvironment::new(config).unwrap();
3950
3951 // Not yet registered.
3952 assert!(
3953 !rep_env
3954 .restore_registered
3955 .load(std::sync::atomic::Ordering::SeqCst)
3956 );
3957
3958 // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
3959 let env_impl = Arc::new(
3960 EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
3961 );
3962 rep_env.with_environment(env_impl);
3963
3964 // Now the RESTORE service must be registered.
3965 assert!(
3966 rep_env
3967 .restore_registered
3968 .load(std::sync::atomic::Ordering::SeqCst)
3969 );
3970 }
3971
3972 /// Verify that when `config.env_home` IS set at construction, the RESTORE
3973 /// service is registered immediately (not deferred).
3974 #[test]
3975 fn test_restore_registered_eagerly_when_env_home_in_config() {
3976 use tempfile::TempDir;
3977
3978 let dir = TempDir::new().expect("temp dir");
3979
3980 let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
3981 .node_port(0)
3982 .env_home(dir.path())
3983 .build();
3984
3985 let rep_env = ReplicatedEnvironment::new(config).unwrap();
3986
3987 // Should be registered immediately (env_home was in config).
3988 assert!(
3989 rep_env
3990 .restore_registered
3991 .load(std::sync::atomic::Ordering::SeqCst)
3992 );
3993 }
3994}