noxu_rep/replicated_environment.rs
1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29 AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30 ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43 ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::{
50 AnyServiceDispatcher, TcpServiceDispatcher,
51};
52use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
53use crate::network_restore_server::{
54 NetworkRestoreServer, RESTORE_SERVICE_NAME,
55};
56use crate::node_state::{NodeState, NodeStateMachine};
57use crate::rep_config::RepConfig;
58use crate::rep_stats::RepStats;
59use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
60use crate::stream::feeder::EnvironmentLogScanner;
61use crate::stream::feeder::Feeder;
62use crate::stream::feeder::FeederRunner;
63use crate::stream::peer_feeder::PeerScannerAdapter;
64use crate::stream::peer_feeder::{
65 PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
66};
67use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
68use crate::stream::syncup::{
69 Matchpoint, RollbackDecision, find_matchpoint, verify_rollback,
70};
71use crate::stream::syncup_reader::VlsnIndexView;
72use crate::vlsn::vlsn_index::VlsnIndex;
73use crate::vlsn::vlsn_range::VlsnRange;
74use std::collections::HashMap;
75
76/// Default heartbeat timeout for master liveness detection.
77const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
78
79/// A replicated database environment.
80///
81///
82///
83/// This is the entry point for replication. It wraps a standard Environment
84/// and adds replication capabilities including master election, replica
85/// streaming, and commit acknowledgments.
86///
87/// High Availability (HA) provides a replicated, embedded database
88/// management system which provides fast, reliable, and scalable data
89/// management. HA enables replication of an environment across a Replication
90/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
91///
92/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
93/// operations are executed in the same fashion in both replicated and
94/// non-replicated applications. A `ReplicatedEnvironment` must be
95/// transactional. All replicated databases created in the replicated
96/// environment must be transactional as well.
97///
98/// A `ReplicatedEnvironment` joins its replication group when it is created.
99/// When `new()` returns, the node will have established contact with the other
100/// members of the group and will be ready to service operations.
101///
102/// Replicated environments can be created with node type Electable or
103/// Secondary. Electable nodes can be masters or replicas, and participate in
104/// both master elections and commit durability decisions. Secondary nodes can
105/// only be replicas, not masters, and do not participate in either elections or
106/// durability decisions.
107///
108/// # Example
109///
110/// ```ignore
111/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
112///
113/// let config = RepConfig::builder("my_group", "node1", "localhost")
114/// .node_port(5001)
115/// .build();
116/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
117/// ```
118/// Outcome of [`ReplicatedEnvironment::syncup_with_feeder`] — the action taken
119/// by a live diverged-tail syncup. Port of the branch in JE
120/// `ReplicaFeederSyncup.execute` between a soft rollback and a network restore.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum SyncupAction {
123 /// The divergent tail was rolled back to the matchpoint; resume streaming
124 /// from `start_vlsn` (`matchpoint + 1`). `matchpoint_vlsn == last VLSN`
125 /// means the replica was not diverged and nothing was truncated.
126 RolledBack { matchpoint_vlsn: u64, start_vlsn: u64 },
127 /// No safe rollback (no common matchpoint, or it would cross a committed
128 /// txn); the replica must do a full network restore.
129 NeedsRestore,
130}
131
132pub struct ReplicatedEnvironment {
133 /// The replication configuration for this node.
134 config: RepConfig,
135 /// Tracks the current node state (Detached, Unknown, Master, Replica).
136 node_state: NodeStateMachine,
137 /// Service for managing the replication group membership.
138 group_service: GroupService,
139 /// Maps VLSNs to log file positions.
140 ///
141 /// Wrapped in `Arc` so that background daemons (election driver,
142 /// VLSN-index persistence flusher) can share access without
143 /// borrowing the env. Closes finding F11 (
144 /// the 2026 review).
145 vlsn_index: Arc<VlsnIndex>,
146 /// Tracks acknowledgments from replicas (used by master).
147 ack_tracker: AckTracker,
148 /// Replication statistics.
149 stats: RepStats,
150 /// Active feeder threads (master -> replica streams).
151 feeders: RwLock<Vec<Feeder>>,
152 /// Replica stream for receiving updates from the master.
153 replica_stream: ReplicaStream,
154 /// Tracks the current master node.
155 master_tracker: MasterTracker,
156 /// State change listeners.
157 listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
158 /// Shutdown flag.
159 shutdown: AtomicBool,
160 /// Service dispatcher — listens on the replication port and routes
161 /// incoming connections to the appropriate service handler (feeder, etc.).
162 ///
163 /// `Plain`: plain TCP (default / Phase-2 behaviour).
164 /// `Tls`: TLS + mTLS enforcement (Phase 3, when `RepConfig::tls_config` is set
165 /// and `transport_kind` is `Tls`).
166 ///
167 /// `None` only when the bind address cannot be resolved.
168 tcp_dispatcher: Option<AnyServiceDispatcher>,
169 /// The address the `tcp_dispatcher` is actually bound to (may differ from
170 /// the configured port when port 0 is used in tests).
171 bound_addr: Option<SocketAddr>,
172
173 /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
174 ///
175 /// When set, `become_master` spawns a `FeederRunner` per replica using
176 /// `EnvironmentLogScanner`, and `become_replica` spawns a
177 /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
178 ///
179 /// In HA.
180 env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
181
182 /// Background I/O thread handles spawned during state transitions.
183 ///
184 /// Stored so that `close()` can join them cleanly. Each handle is
185 /// `Option` so we can `take()` it in `close()`.
186 io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
187
188 /// Shutdown flag shared with I/O threads so they terminate when the
189 /// environment is closed.
190 io_shutdown: AtomicBool,
191
192 /// Whether the RESTORE service has been registered on the TCP dispatcher.
193 ///
194 /// When `config.env_home` is `None` at construction time, registration is
195 /// deferred until `with_environment()` provides the env home path.
196 restore_registered: AtomicBool,
197
198 /// In-memory log queue used by the peer feeder service.
199 ///
200 /// When this node is a replica, `apply_entry()` pushes each received log
201 /// entry here. The `PeerFeederService` registered on the TCP dispatcher
202 /// reads from this queue to stream entries to downstream replicas that
203 /// are behind this node (peer-to-peer log distribution, HA style).
204 peer_scanner: Arc<PeerLogScanner>,
205
206 /// Durable Transaction VLSN (D7, JE RepNode.dtvlsn): the highest VLSN
207 /// known to have been replicated to a *majority* of the electable
208 /// replicas. On a master it is computed from feeder ack/heartbeat progress
209 /// (`update_dtvlsn_from_feeders`); on a replica it is set from commit/abort
210 /// records in the stream (`set_dtvlsn`). It advances monotonically (an
211 /// `update_max`). 0 = NULL_VLSN. Used by the election ranking (D2) so the
212 /// most-durable node, not merely the highest-raw-VLSN node, wins.
213 dtvlsn: std::sync::atomic::AtomicU64,
214
215 /// Shared acceptor state used by the ELECTION service handler.
216 /// The election driver updates `own_vlsn` / `own_term` here as the
217 /// node progresses; incoming acceptor sessions read it on every
218 /// connection so their replies always reflect the local node's
219 /// most recent state. Closes finding F6.
220 election_state: Arc<ElectionAcceptorState>,
221
222 /// Self-referential `Weak` populated once the env has been wrapped
223 /// in an `Arc`. Used by the replica I/O thread spawned in
224 /// `become_replica` so it can call `bootstrap_via_dispatcher` when
225 /// the master signals `NeedsRestore`.
226 ///
227 /// Populated lazily via [`Self::init_self_weak`] from `open()` and
228 /// the test harness. When unset (callers that build the env via
229 /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
230 /// the I/O thread falls back to operator-driven bootstrap.
231 self_weak: OnceLock<Weak<Self>>,
232
233 // -----------------------------------------------------------------------
234 // C-C2: active push-feeder infrastructure
235 // -----------------------------------------------------------------------
236 /// Per-replica channels injected via [`Self::register_feeder_channel`].
237 ///
238 /// When [`Self::become_master`] is called (or when the node is already
239 /// master), a [`FeederRunner`] thread is spawned for each registered
240 /// channel, actively streaming entries to that replica over the channel.
241 ///
242 /// Using `register_feeder_channel` is the primary integration point for
243 /// the push-based feeder path. Production deployments wire in a
244 /// `TcpChannel`; test code uses `LocalChannelPair`.
245 feeder_channels: StdMutex<HashMap<String, Arc<dyn crate::net::Channel>>>,
246
247 /// Per-replica dedicated entry queues backing the push-feeder path.
248 ///
249 /// Each `FeederRunner` thread reads exclusively from its replica's queue.
250 /// [`Self::replicate_entry`] and [`Self::apply_entry`] fan out into all
251 /// registered queues so the push runners receive entries without competing
252 /// with [`PeerFeederService`] for ownership of `peer_scanner`.
253 feeder_queues: std::sync::RwLock<HashMap<String, Arc<PeerLogScanner>>>,
254
255 /// Active `FeederRunner` references for acked-VLSN queries and
256 /// clean shutdown (M-4: wait for replicas to catch up).
257 active_feeder_runners: StdMutex<HashMap<String, Arc<FeederRunner>>>,
258
259 /// Monotone VLSN counter shared with the wired `EnvironmentImpl`.
260 ///
261 /// Installed into the environment via
262 /// `EnvironmentImpl::set_replication_vlsn_counter()` when
263 /// `with_environment` is called. Each `log_txn_commit` on the master
264 /// atomically increments this counter and writes a VLSN-tagged WAL entry,
265 /// which `EnvironmentLogScanner` then picks up without any
266 /// `replicate_entry` call from the application.
267 wal_vlsn_counter: Arc<std::sync::atomic::AtomicU64>,
268
269 /// REP-10 (C): the replica-side consistency tracker, built from the
270 /// REP-7 `last_applied_vlsn` handle when the replica replay thread starts
271 /// (`become_replica`). `None` on a master or before replay is wired.
272 ///
273 /// A read that begins on a replica with a non-`NoConsistency` policy waits
274 /// on this tracker (`begin_read_consistency`). Port of
275 /// `RepImpl.getConsistency` / `Replica.getConsistencyTracker`.
276 consistency_tracker: StdMutex<Option<crate::ConsistencyTracker>>,
277}
278
279impl ReplicatedEnvironment {
280 /// Create a new replicated environment.
281 ///
282 ///
283 ///
284 /// Creates a replicated environment handle and starts participating in the
285 /// replication group. The node's state is determined when it joins the
286 /// group, and mastership is not preconfigured. If the group has no current
287 /// master, creation will trigger an election to determine whether this node
288 /// will participate as a Master or a Replica.
289 ///
290 /// A brand new node will always join an existing group as a Replica, unless
291 /// it is the very first electable node that is creating the group. In that
292 /// case it joins as the Master of the newly formed singleton group.
293 pub fn new(config: RepConfig) -> Result<Self> {
294 // mTLS Phase 2 (v3.1.0): peer_allowlist enforcement is real at the
295 // TLS channel layer (TlsTcpChannelListener::bind_with_tls_and_allowlist).
296 // Phase 3 (this release): when RepConfig::tls_config is set AND
297 // transport_kind is Tls, the service dispatcher itself enforces mTLS
298 // via TlsTcpServiceDispatcher. For the remaining cases (no TlsConfig
299 // or non-TLS transport) keep the Phase-2 accurate warn.
300 if !config.peer_allowlist.is_empty() {
301 match config.transport_kind {
302 crate::rep_config::RepTransportKind::Tls => {
303 if config.tls_config.is_some() {
304 log::info!(
305 "[{}] peer_allowlist ({} entries) + tls_config set; \
306 mTLS will be enforced on the service dispatcher.",
307 config.node_name,
308 config.peer_allowlist.len(),
309 );
310 } else {
311 log::info!(
312 "[{}] peer_allowlist configured ({} entries) but \
313 tls_config is None — the service dispatcher will \
314 use plain TCP. Set RepConfig::tls_config to \
315 activate end-to-end mTLS on this path.",
316 config.node_name,
317 config.peer_allowlist.len(),
318 );
319 }
320 }
321 _ => {
322 log::warn!(
323 "[{}] peer_allowlist is configured ({} entries) but \
324 transport_kind is not Tls — the allowlist has no \
325 effect without TLS transport. Set \
326 RepTransportKind::Tls to activate mTLS enforcement.",
327 config.node_name,
328 config.peer_allowlist.len(),
329 );
330 }
331 }
332 }
333 let node_state = NodeStateMachine::new();
334 let group_service = GroupService::new(config.group_name.clone());
335 let vlsn_index = {
336 // F11: try to load a previously persisted vlsn.idx from
337 // env_home if one exists. A successfully loaded index lets a
338 // restarted replica resume from where it left off without a
339 // full network restore; a missing or corrupt file falls back
340 // to a fresh in-memory index (caller will need to bootstrap).
341 if let Some(ref home) = config.env_home {
342 match crate::vlsn::persist::load_from_disk(home) {
343 Ok(Some(idx)) => {
344 log::info!(
345 "Node '{}' loaded persisted VLSN index from {} \
346 ({} entries, latest vlsn={})",
347 config.node_name,
348 home.display(),
349 idx.snapshot_entries().len(),
350 idx.get_latest_vlsn(),
351 );
352 Arc::new(idx)
353 }
354 Ok(None) => Arc::new(VlsnIndex::new(10)),
355 Err(e) => {
356 log::warn!(
357 "Node '{}' failed to load persisted VLSN index \
358 from {}: {} (treating as fresh node — network \
359 restore required)",
360 config.node_name,
361 home.display(),
362 e
363 );
364 // Best-effort: remove the corrupt file so the
365 // next persist cycle writes a clean one. A
366 // missing file is the "fresh node" baseline.
367 let _ = std::fs::remove_file(
368 crate::vlsn::persist::index_path(home),
369 );
370 Arc::new(VlsnIndex::new(10))
371 }
372 }
373 } else {
374 Arc::new(VlsnIndex::new(10))
375 }
376 };
377 let ack_tracker = AckTracker::new();
378 let stats = RepStats::new();
379 let feeders = RwLock::new(Vec::new());
380 let replica_stream = ReplicaStream::new();
381 let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
382
383 // Start the service dispatcher.
384 //
385 // Phase 3: when RepConfig::tls_config is set AND transport_kind is Tls,
386 // start a TlsTcpServiceDispatcher (mTLS enforced). Otherwise fall back
387 // to the plain-TCP TcpServiceDispatcher.
388 let listen_addr_str =
389 format!("{}:{}", config.node_host, config.node_port);
390 let mut restore_registered_init = false;
391
392 // Returns (AnyServiceDispatcher, bound_addr) or (None, None) on error.
393 let (tcp_dispatcher, bound_addr) = match listen_addr_str
394 .parse::<SocketAddr>()
395 {
396 Ok(addr) => {
397 let build_result: Result<(AnyServiceDispatcher, SocketAddr)> =
398 Self::build_dispatcher(&config, addr);
399 match build_result {
400 Ok((dispatcher, bound)) => {
401 // Register the network restore handler.
402 if let Some(ref home) = config.env_home {
403 let restore_server =
404 NetworkRestoreServer::new(home.clone());
405 dispatcher.register(
406 RESTORE_SERVICE_NAME,
407 Arc::new(restore_server),
408 );
409 log::debug!(
410 "Node '{}' RESTORE service registered \
411 (env_home={})",
412 config.node_name,
413 home.display(),
414 );
415 restore_registered_init = true;
416 }
417 let kind =
418 if dispatcher.is_tls() { "TLS" } else { "TCP" };
419 log::info!(
420 "Node '{}' {} service dispatcher started on {}",
421 config.node_name,
422 kind,
423 bound
424 );
425 (Some(dispatcher), Some(bound))
426 }
427 Err(e) => {
428 log::warn!(
429 "Node '{}' failed to start service dispatcher \
430 on {}: {}",
431 config.node_name,
432 listen_addr_str,
433 e
434 );
435 (None, None)
436 }
437 }
438 }
439 Err(e) => {
440 log::warn!(
441 "Node '{}' cannot parse listen address '{}': {}",
442 config.node_name,
443 listen_addr_str,
444 e
445 );
446 (None, None)
447 }
448 };
449
450 // Build the in-memory peer log scanner; register the peer feeder
451 // service on the dispatcher so downstream replicas can connect.
452 let peer_scanner = Arc::new(PeerLogScanner::new());
453 // F5/F31: build the acceptor state with persistence enabled when
454 // env_home is configured. Crash-durable promises are required
455 // for the Paxos safety invariant after a process restart.
456 let election_state =
457 Arc::new(if let Some(ref home) = config.env_home {
458 ElectionAcceptorState::with_env_home(
459 config.node_name.clone(),
460 1,
461 home,
462 )
463 } else {
464 ElectionAcceptorState::new(config.node_name.clone(), 1)
465 });
466 if let Some(ref dispatcher) = tcp_dispatcher {
467 let service = PeerFeederService::new(Arc::clone(&peer_scanner));
468 dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
469 log::debug!(
470 "Node '{}' PEER_FEEDER service registered",
471 config.node_name,
472 );
473 // F6: register the ELECTION service so peers can run
474 // run_acceptor against this node when proposing.
475 let election_svc =
476 Arc::new(ElectionService::new(Arc::clone(&election_state)));
477 dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
478 log::debug!(
479 "Node '{}' ELECTION service registered",
480 config.node_name,
481 );
482 }
483
484 let env = Self {
485 config,
486 node_state,
487 group_service,
488 vlsn_index,
489 ack_tracker,
490 stats,
491 feeders,
492 replica_stream,
493 master_tracker,
494 listeners: RwLock::new(Vec::new()),
495 shutdown: AtomicBool::new(false),
496 tcp_dispatcher,
497 bound_addr,
498 env_impl: StdMutex::new(None),
499 io_threads: StdMutex::new(Vec::new()),
500 io_shutdown: AtomicBool::new(false),
501 restore_registered: AtomicBool::new(restore_registered_init),
502 peer_scanner,
503 dtvlsn: std::sync::atomic::AtomicU64::new(0),
504 election_state,
505 self_weak: OnceLock::new(),
506 feeder_channels: StdMutex::new(HashMap::new()),
507 feeder_queues: std::sync::RwLock::new(HashMap::new()),
508 active_feeder_runners: StdMutex::new(HashMap::new()),
509 wal_vlsn_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
510 consistency_tracker: StdMutex::new(None),
511 };
512
513 Ok(env)
514 }
515
516 /// Open a replicated environment with the standard production
517 /// lifecycle.
518 ///
519 /// This is the entry point recommended by the mdBook chapters:
520 /// it allocates the `ReplicatedEnvironment`, registers all
521 /// services on the TCP dispatcher, and spawns the **election
522 /// driver** background thread that runs Paxos rounds against
523 /// known peers until the node has resolved into either Master or
524 /// Replica state.
525 ///
526 /// Closes finding F6 of the 2026 review.
527 ///
528 /// Use [`ReplicatedEnvironment::new`] directly only when the
529 /// caller plans to drive state transitions explicitly (test
530 /// harnesses, scripted bootstrap, recovery tooling).
531 pub fn open(config: RepConfig) -> Result<Arc<Self>> {
532 let env = Arc::new(Self::new(config)?);
533 env.init_self_weak();
534 env.start_election_driver();
535 env.start_vlsn_persistence_daemon();
536 env.register_admin_service();
537 Ok(env)
538 }
539
540 /// Build the service dispatcher for this node.
541 ///
542 /// Phase 3 logic: when `config.transport_kind == Tls` AND
543 /// `config.tls_config` is `Some`, start a
544 /// [`crate::net::service_dispatcher::TlsTcpServiceDispatcher`] that
545 /// enforces mTLS with the configured `peer_allowlist`. Otherwise
546 /// start the plain-TCP [`TcpServiceDispatcher`].
547 ///
548 /// Returns `(dispatcher, bound_addr)` or a `RepError` on bind / TLS
549 /// config failure.
550 fn build_dispatcher(
551 #[cfg_attr(not(feature = "tls-rustls"), allow(unused_variables))]
552 config: &RepConfig,
553 addr: SocketAddr,
554 ) -> Result<(AnyServiceDispatcher, SocketAddr)> {
555 #[cfg(feature = "tls-rustls")]
556 if config.transport_kind == crate::rep_config::RepTransportKind::Tls {
557 use crate::auth::PeerAllowlist;
558 use crate::net::service_dispatcher::TlsTcpServiceDispatcher;
559 let tls = config.tls_config.as_ref().ok_or_else(|| {
560 RepError::ConfigError(format!(
561 "node '{}': transport_kind=Tls requires a tls_config",
562 config.node_name,
563 ))
564 })?;
565 let allowlist =
566 PeerAllowlist::new(config.peer_allowlist.iter().cloned());
567 // Fail-closed: an empty allowlist with TLS transport is a
568 // misconfiguration. The same policy is enforced at the TLS
569 // listener and QUIC constructors; downgrading to plain TCP here
570 // would be a silent security regression for a node that asked
571 // for TLS.
572 if allowlist.is_empty() {
573 return Err(RepError::ConfigError(format!(
574 "node '{}': transport_kind=Tls requires a non-empty \
575 peer_allowlist (mTLS enforcement is fail-closed)",
576 config.node_name,
577 )));
578 }
579 let disp = TlsTcpServiceDispatcher::new(addr, tls, allowlist)?;
580 let bound = disp.start()?;
581 return Ok((AnyServiceDispatcher::Tls(disp), bound));
582 }
583 // Plain-TCP dispatcher (default or when TLS config is missing).
584 let disp = TcpServiceDispatcher::new(addr).map_err(|e| {
585 RepError::NetworkError(format!("TCP dispatcher init: {e}"))
586 })?;
587 let bound = disp.start()?;
588 Ok((AnyServiceDispatcher::Plain(disp), bound))
589 }
590
591 /// Populate the env's self-referential `Weak` so background
592 /// threads can obtain a back-reference for auto-orchestrated
593 /// follow-up actions (e.g. replica auto-bootstrap on
594 /// `NeedsRestore`). Idempotent: subsequent calls are silent
595 /// no-ops because the inner [`OnceLock`] only accepts one set.
596 ///
597 /// Callers that wrap the env in `Arc` and want auto-bootstrap
598 /// behaviour should call this immediately after construction.
599 /// `Self::open` already does so. Test harnesses that drive
600 /// transitions manually (`RepTestBase`) also call this so the
601 /// auto-bootstrap path is exercised in tests.
602 pub fn init_self_weak(self: &Arc<Self>) {
603 let _ = self.self_weak.set(Arc::downgrade(self));
604 }
605
606 /// Register the `ADMIN` service handler on the TCP dispatcher.
607 ///
608 /// Closes findings F7 / F8. Holds a `Weak<Self>` so the handler
609 /// does not extend the env's lifetime. Idempotent: re-registering
610 /// is harmless because `TcpServiceDispatcher::register` overwrites
611 /// the existing handler.
612 pub fn register_admin_service(self: &Arc<Self>) {
613 if let Some(ref dispatcher) = self.tcp_dispatcher {
614 crate::group_admin::register_admin_service(
615 dispatcher,
616 Arc::downgrade(self),
617 );
618 log::debug!(
619 "Node '{}' ADMIN service registered",
620 self.config.node_name,
621 );
622 }
623 }
624
625 /// Spawn the VLSN-index persistence daemon (F11).
626 ///
627 /// Periodically (every 2 seconds) snapshots the in-memory
628 /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
629 /// resume from where the replica left off without a full network
630 /// restore. No-op when `config.env_home` is `None`.
631 ///
632 /// Idempotent: only one daemon is ever spawned per env.
633 pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
634 let Some(home) = self.config.env_home.clone() else {
635 return;
636 };
637 {
638 let threads = self.io_threads.lock().unwrap();
639 if threads.iter().any(|h| {
640 h.thread()
641 .name()
642 .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
643 }) {
644 return;
645 }
646 }
647
648 let vlsn_index = Arc::clone(&self.vlsn_index);
649 let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
650 let me = Arc::clone(self);
651 let interval = Duration::from_secs(2);
652
653 let handle = std::thread::Builder::new()
654 .name(name)
655 .spawn(move || {
656 use std::sync::atomic::Ordering;
657 let mut last_persisted_vlsn: u64 = 0;
658 while !me.io_shutdown.load(Ordering::SeqCst)
659 && !me.is_shutdown()
660 {
661 std::thread::sleep(interval);
662 if me.io_shutdown.load(Ordering::SeqCst) {
663 break;
664 }
665 let latest = vlsn_index.get_latest_vlsn();
666 if latest == last_persisted_vlsn {
667 // Nothing new to flush.
668 continue;
669 }
670 // X-2: cap the flush at the last durable checkpoint's
671 // end LSN so the persisted VLSN index never claims
672 // VLSNs beyond the durable B-tree state. After a crash
673 // the recovered tree and the index will be coherent.
674 let cap_lsn = me
675 .env_impl
676 .lock()
677 .unwrap()
678 .as_ref()
679 .and_then(|e| e.get_checkpointer())
680 .map(|c| c.get_last_checkpoint_end())
681 .unwrap_or(noxu_util::NULL_LSN);
682 match crate::vlsn::persist::flush_to_disk_capped(
683 &vlsn_index,
684 &home,
685 cap_lsn,
686 ) {
687 Ok(n) => {
688 log::trace!(
689 "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
690 n,
691 latest,
692 cap_lsn,
693 );
694 last_persisted_vlsn = latest;
695 }
696 Err(e) => {
697 log::warn!(
698 "vlsn-flush: failed to persist VLSN index to {}: {}",
699 home.display(),
700 e
701 );
702 }
703 }
704 }
705 // Final flush on shutdown so a clean close is recoverable.
706 // Cap at the last checkpoint even for the shutdown flush.
707 let cap_lsn = me
708 .env_impl
709 .lock()
710 .unwrap()
711 .as_ref()
712 .and_then(|e| e.get_checkpointer())
713 .map(|c| c.get_last_checkpoint_end())
714 .unwrap_or(noxu_util::NULL_LSN);
715 if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
716 &vlsn_index,
717 &home,
718 cap_lsn,
719 ) {
720 log::warn!(
721 "vlsn-flush (final): failed to persist VLSN index: {}",
722 e
723 );
724 }
725 })
726 .expect("failed to spawn noxu-vlsn-flush thread");
727
728 self.io_threads.lock().unwrap().push(handle);
729 log::debug!(
730 "Node '{}' VLSN persistence daemon started",
731 self.config.node_name,
732 );
733 }
734
735 /// Spawn the election driver background thread.
736 ///
737 /// While the env is in `Detached` or `Unknown` state and no master
738 /// is known, the driver periodically attempts a Paxos election
739 /// against peers in `GroupService` (whose ELECTION services were
740 /// registered at `open()` time). On success the driver calls
741 /// `become_master` (if this node is the winner) or `become_replica`
742 /// (otherwise). On failure (no quorum), the driver waits
743 /// `config.election_timeout` and tries again.
744 ///
745 /// The driver respects `io_shutdown`; on env close the loop exits
746 /// promptly.
747 ///
748 /// Idempotent: a second call is a no-op (only one driver thread is
749 /// ever spawned per env).
750 pub fn start_election_driver(self: &Arc<Self>) {
751 use std::sync::atomic::Ordering;
752 // Reuse io_shutdown for cancellation; a successful spawn is
753 // recorded by appending to io_threads, so a duplicate call
754 // would re-add a thread — we use a one-shot `AtomicBool`
755 // sentinel placed in the io_shutdown's slot via a new field.
756 // Cheaper: a static name check on io_threads is impossible;
757 // instead, gate spawning on whether any io_thread already
758 // carries the driver name.
759 {
760 let threads = self.io_threads.lock().unwrap();
761 if threads.iter().any(|h| {
762 h.thread()
763 .name()
764 .is_some_and(|n| n.starts_with("noxu-election-"))
765 }) {
766 return;
767 }
768 }
769
770 let me = Arc::clone(self);
771 let name = format!("noxu-election-{}", self.config.node_name);
772 let handle = std::thread::Builder::new()
773 .name(name)
774 .spawn(move || {
775 me.run_election_loop();
776 })
777 .expect("failed to spawn election driver thread");
778 self.io_threads.lock().unwrap().push(handle);
779 log::debug!("Node '{}' election driver started", self.config.node_name,);
780 // Keep ordering sane on the io_shutdown flag.
781 let _ = self.io_shutdown.load(Ordering::SeqCst);
782 }
783
784 /// Body of the election driver loop. Public only for tests; called
785 /// by [`Self::start_election_driver`].
786 fn run_election_loop(self: Arc<Self>) {
787 use std::sync::atomic::Ordering;
788 // Maintain an internal monotonically increasing election term.
789 // Each successful or failed round bumps the term so retries do
790 // not collide with stale acceptor promises.
791 let mut term: u64 = 1;
792
793 loop {
794 if self.io_shutdown.load(Ordering::SeqCst) {
795 return;
796 }
797 if self.is_shutdown() {
798 return;
799 }
800
801 let state = self.node_state.get_state();
802 // Stop driving once we've resolved into Master/Replica;
803 // re-arm only if the node returns to Unknown.
804 if matches!(state, NodeState::Master | NodeState::Replica) {
805 std::thread::sleep(Duration::from_millis(200));
806 continue;
807 }
808 if matches!(state, NodeState::Shutdown) {
809 return;
810 }
811
812 // Probe peers for an active master via the existing
813 // GroupService cache. In the absence of a heartbeat path
814 // we rely on master_tracker (set by become_replica from
815 // the receive loop).
816 if let Some(master_name) = self.master_tracker.get_master()
817 && master_name != self.config.node_name
818 {
819 let _ = self.become_replica(&master_name);
820 continue;
821 }
822
823 // Snapshot peers to dial for ELECTION.
824 let peers: Vec<(String, SocketAddr)> = self
825 .group_service
826 .get_all_nodes()
827 .into_iter()
828 .filter(|n| n.name != self.config.node_name)
829 .filter_map(|n| {
830 format!("{}:{}", n.host, n.port)
831 .parse::<SocketAddr>()
832 .ok()
833 .map(|a| (n.name, a))
834 })
835 .collect();
836
837 // Build the local rep group view used by run_election to
838 // compute quorum and resolve the winner name. Include
839 // self.
840 let group = self.local_rep_group_with_self();
841
842 // Update election state for any concurrent acceptor calls.
843 let our_vlsn = self.vlsn_index.get_latest_vlsn();
844 self.election_state.set_vlsn(our_vlsn);
845 self.election_state.set_term(term);
846 // D2: advertise our DTVLSN as the major election-ranking key.
847 self.election_state.set_dtvlsn(self.get_dtvlsn());
848
849 // Connect to each peer's ELECTION service. Failures are
850 // tolerated: a peer that doesn't answer simply contributes
851 // no vote. The election may still reach quorum in the
852 // remaining peers.
853 let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
854 Vec::new();
855 for (peer_name, addr) in &peers {
856 match crate::net::service_dispatcher::connect_to_service(
857 *addr,
858 ELECTION_SERVICE_NAME,
859 ) {
860 Ok(ch) => {
861 let arc: Arc<dyn crate::net::channel::Channel> =
862 Arc::new(ch);
863 channels.push(arc);
864 }
865 Err(e) => {
866 log::trace!(
867 "election driver: peer {} ({}) unreachable: {}",
868 peer_name,
869 addr,
870 e
871 );
872 }
873 }
874 }
875
876 // Resolve our own node_id from the group; if not present
877 // we cannot run an election (closed-world guard — see F22).
878 let self_node_id =
879 group.get_node(&self.config.node_name).map(|n| n.node_id());
880 let self_node_id = match self_node_id {
881 Some(id) => id,
882 None => {
883 log::warn!(
884 "election driver: node '{}' not registered in \
885 own group view; sleeping",
886 self.config.node_name
887 );
888 std::thread::sleep(Duration::from_millis(200));
889 continue;
890 }
891 };
892
893 log::debug!(
894 "election driver on '{}': starting term={} with {} peers",
895 self.config.node_name,
896 term,
897 channels.len(),
898 );
899 let outcome = crate::elections::paxos::run_election_with_phi_dtvlsn(
900 self_node_id,
901 &self.config.node_name,
902 &group,
903 &channels,
904 our_vlsn,
905 /* priority */ 1,
906 term,
907 /* own_dtvlsn (D2 major ranking key) */
908 self.get_dtvlsn(),
909 None,
910 std::time::Duration::from_millis(500),
911 );
912
913 match outcome {
914 Some(winner_id) if winner_id == self_node_id => {
915 if let Err(e) = self.become_master(term) {
916 log::warn!(
917 "election driver: become_master failed: {}",
918 e
919 );
920 } else {
921 log::info!(
922 "election driver: '{}' became master at term {}",
923 self.config.node_name,
924 term,
925 );
926 }
927 }
928 Some(winner_id) => {
929 if let Some(winner_node) = group
930 .get_nodes()
931 .into_iter()
932 .find(|n| n.node_id() == winner_id)
933 {
934 if let Err(e) = self.become_replica(&winner_node.name) {
935 log::warn!(
936 "election driver: become_replica failed: {}",
937 e
938 );
939 } else {
940 log::info!(
941 "election driver: '{}' became replica of '{}' at term {}",
942 self.config.node_name,
943 winner_node.name,
944 term,
945 );
946 }
947 }
948 }
949 None => {
950 log::debug!(
951 "election driver on '{}' term={}: no quorum",
952 self.config.node_name,
953 term,
954 );
955 }
956 }
957
958 term = term.saturating_add(1);
959 // Back off so we don't pin the loop on transient failures.
960 std::thread::sleep(
961 self.config.election_timeout.min(Duration::from_millis(500)),
962 );
963 }
964 }
965
966 /// Internal: a `RepGroup` snapshot that includes self.
967 fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
968 let mut group = self.get_rep_group();
969 // Ensure self is present in the group view; the
970 // group_service does not auto-register the local node.
971 if group.get_node(&self.config.node_name).is_none() {
972 let mut self_node = crate::rep_node::RepNode::new(
973 self.config.node_name.clone(),
974 self.config.node_type,
975 self.config.node_host.clone(),
976 self.config.node_port,
977 /* node_id */ 0,
978 );
979 // Stable self node_id derived from the name hash so
980 // re-creations in the same process don't collide.
981 use std::hash::{Hash, Hasher};
982 let mut hasher = std::collections::hash_map::DefaultHasher::new();
983 self.config.node_name.hash(&mut hasher);
984 // Restrict to a u32 range and avoid 0 (reserved for
985 // "unknown").
986 let id = ((hasher.finish() as u32) | 1).max(1);
987 self_node.node_id = id;
988 group.add_node(self_node);
989 }
990 group
991 }
992
993 /// Return the socket address the TCP service dispatcher is bound to.
994 ///
995 /// This may differ from the configured `node_port` when port 0 is used
996 /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
997 /// could not be started (e.g. the address is not resolvable).
998 pub fn bound_addr(&self) -> Option<SocketAddr> {
999 self.bound_addr
1000 }
1001
1002 /// Wire a live `EnvironmentImpl` into this replicated environment.
1003 ///
1004 /// After this call, state transitions (`become_master`, `become_replica`)
1005 /// will spawn real feeder/receiver I/O threads backed by the live log.
1006 ///
1007 /// If the RESTORE service was not registered at construction time (because
1008 /// `config.env_home` was `None`), it is registered here using the
1009 /// environment's actual home path. This mirrors`RepNode.envSetup()`
1010 /// which registers the restore handler during environment wiring.
1011 ///
1012 /// Environment reference wiring.
1013 /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
1014 pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
1015 // Register RESTORE service lazily if not already done.
1016 if !self.restore_registered.load(Ordering::SeqCst)
1017 && let Some(ref dispatcher) = self.tcp_dispatcher
1018 {
1019 let env_home = env.get_env_home().to_path_buf();
1020 let restore_server = NetworkRestoreServer::new(env_home.clone());
1021 dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
1022 self.restore_registered.store(true, Ordering::SeqCst);
1023 log::debug!(
1024 "Node '{}' RESTORE service registered via with_environment \
1025 (env_home={})",
1026 self.config.node_name,
1027 env_home.display(),
1028 );
1029 }
1030
1031 // X-14: rebuild the VLSN index from recovery-replayed LN entries.
1032 // After a crash the on-disk vlsn.idx may be stale (either ahead of
1033 // the recovered B-tree, or behind if vlsn.idx was not flushed
1034 // after the last checkpoint). Re-registering all (vlsn, lsn) pairs
1035 // from the redo pass gives a consistent in-memory index.
1036 if !env.recovery_vlsns.is_empty() {
1037 log::info!(
1038 "Node '{}': rebuilding VLSN index from {} recovered entries",
1039 self.config.node_name,
1040 env.recovery_vlsns.len(),
1041 );
1042 for &(vlsn, lsn_u64) in &env.recovery_vlsns {
1043 let lsn = noxu_util::Lsn::from_u64(lsn_u64);
1044 self.vlsn_index.register(
1045 vlsn,
1046 lsn.file_number(),
1047 lsn.file_offset(),
1048 );
1049 }
1050 }
1051
1052 // X-1: truncate the VLSN index to the rollback matchpoint if recovery
1053 // detected a completed rollback period. The matchpoint is the highest
1054 // LSN that is still valid after the rollback; entries with higher VLSNs
1055 // correspond to data that was rolled back and must not appear in the
1056 // index.
1057 if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
1058 // Find the latest VLSN whose LSN is at or before the matchpoint.
1059 // Scan the recovered VLSN pairs (sorted ascending) to find the
1060 // boundary.
1061 let safe_vlsn = env
1062 .recovery_vlsns
1063 .iter()
1064 .rev()
1065 .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
1066 .map(|&(vlsn, _)| vlsn)
1067 .unwrap_or(0);
1068 log::info!(
1069 "Node '{}': truncating VLSN index after vlsn={} \
1070 (rollback matchpoint lsn={:#x})",
1071 self.config.node_name,
1072 safe_vlsn,
1073 matchpoint_lsn_u64,
1074 );
1075 self.vlsn_index.truncate_after(safe_vlsn);
1076 }
1077
1078 *self.env_impl.lock().unwrap() = Some(Arc::clone(&env));
1079
1080 // C-C2b: install the VLSN counter so log_txn_commit writes
1081 // VLSN-tagged headers. When become_master then spawns an
1082 // EnvironmentLogScanner-backed FeederRunner, it will find these
1083 // entries and auto-feed them to replicas without any
1084 // replicate_entry call from the application.
1085 env.set_replication_vlsn_counter(Arc::clone(&self.wal_vlsn_counter));
1086 }
1087
1088 /// Get the current node state.
1089 ///
1090 ///
1091 ///
1092 /// Returns the current state of the node associated with this replication
1093 /// environment. If the caller's intent is to track the state of the node,
1094 /// `StateChangeListener` may be a more convenient and efficient approach.
1095 pub fn get_state(&self) -> NodeState {
1096 self.node_state.get_state()
1097 }
1098
1099 /// Check if this node is the master.
1100 ///
1101 /// Returns true if the node's current state is Master.
1102 pub fn is_master(&self) -> bool {
1103 self.node_state.get_state() == NodeState::Master
1104 }
1105
1106 /// Returns true if this node is an *authoritative* master (D4, JE
1107 /// `ElectionQuorum.isAuthoritativeMaster`): it is the group master AND it
1108 /// is still connected to enough replicas that, including itself, a
1109 /// SIMPLE_MAJORITY quorum is present.
1110 ///
1111 /// A master on the minority side of a network partition is NOT
1112 /// authoritative — it must not claim the special election ranking
1113 /// (`MASTER_RANKING`) nor (eventually) continue accepting writes, so the
1114 /// majority side can elect a fresh master without it competing
1115 /// (split-brain prevention).
1116 ///
1117 /// "Active replica count" = the number of currently-connected push-feeder
1118 /// runners serving *electable* peers (Monitors/Secondaries do not count
1119 /// toward the election quorum). `+ 1` for this master itself.
1120 pub fn is_authoritative_master(&self) -> bool {
1121 if !self.is_master() {
1122 return false;
1123 }
1124 let group = self.get_rep_group();
1125 // Total electable nodes (incl. self) — peers + this master.
1126 let electable_total: usize = group
1127 .get_nodes()
1128 .iter()
1129 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
1130 .count()
1131 + 1; // +1 for self/master (not registered as a peer)
1132
1133 // Active replicas = connected feeder runners whose peer is electable.
1134 let active_electable_replicas: usize = {
1135 let runners = self.active_feeder_runners.lock().unwrap();
1136 runners
1137 .keys()
1138 .filter(|name| {
1139 group
1140 .get_node(name)
1141 .map(|n| {
1142 n.node_type == crate::node_type::NodeType::Electable
1143 })
1144 .unwrap_or(false)
1145 })
1146 .count()
1147 };
1148 Self::authoritative_quorum_met(
1149 active_electable_replicas,
1150 electable_total,
1151 )
1152 }
1153
1154 /// Pure SIMPLE_MAJORITY quorum check for `is_authoritative_master` (JE
1155 /// `ElectionQuorum.isAuthoritativeMaster`): `(activeReplicas + 1) >=
1156 /// quorumSize` where `quorumSize = electableTotal / 2 + 1`.
1157 fn authoritative_quorum_met(
1158 active_electable_replicas: usize,
1159 electable_total: usize,
1160 ) -> bool {
1161 let quorum_size = electable_total / 2 + 1;
1162 (active_electable_replicas + 1) >= quorum_size
1163 }
1164
1165 /// Check if this node is a replica.
1166 ///
1167 /// Returns true if the node's current state is Replica.
1168 pub fn is_replica(&self) -> bool {
1169 self.node_state.get_state() == NodeState::Replica
1170 }
1171
1172 /// Returns true if the node is currently participating in the group
1173 /// as a Replica or a Master.
1174 pub fn is_active(&self) -> bool {
1175 self.node_state.get_state().is_active()
1176 }
1177
1178 /// Get the node name.
1179 ///
1180 ///
1181 ///
1182 /// Returns the unique name used to identify this replicated environment.
1183 pub fn get_node_name(&self) -> &str {
1184 self.config.node_name.as_str()
1185 }
1186
1187 /// Get the group name.
1188 ///
1189 /// Returns the name of the replication group this node belongs to.
1190 pub fn get_group_name(&self) -> &str {
1191 self.config.group_name.as_str()
1192 }
1193
1194 /// Get the current master (if known).
1195 ///
1196 /// Returns the name of the node that is currently the master, or None
1197 /// if the master is not known (e.g. the node is in the Unknown or
1198 /// Detached state).
1199 pub fn get_master_name(&self) -> Option<String> {
1200 self.master_tracker.get_master()
1201 }
1202
1203 /// Get the replication group info.
1204 ///
1205 ///
1206 ///
1207 /// Returns a description of the replication group as known by this node.
1208 /// The replicated group metadata is stored in a replicated database and
1209 /// updates are propagated by the current master node to all replicas. If
1210 /// this node is not the master, it is possible for its description of the
1211 /// group to be out of date.
1212 pub fn get_group(&self) -> &GroupService {
1213 &self.group_service
1214 }
1215
1216 /// Add a peer node to the replication group at runtime.
1217 ///
1218 /// The node is registered in the `GroupService` so elections and quorum
1219 /// calculations immediately reflect the new membership.
1220 pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
1221 use crate::group_service::NodeInfo;
1222 use std::time::Instant;
1223
1224 let info = NodeInfo {
1225 name: node.name.clone(),
1226 node_type: node.node_type,
1227 host: node.host.clone(),
1228 port: node.port,
1229 node_id: node.node_id,
1230 joined_at: Instant::now(),
1231 last_seen: Instant::now(),
1232 is_active: true,
1233 known_vlsn: 0,
1234 log_range: None,
1235 read_capacity_pct: node.read_capacity_pct,
1236 write_capacity_pct: node.write_capacity_pct,
1237 latency_hint_ms: node.latency_hint_ms,
1238 };
1239 self.group_service.add_node(info)?;
1240 log::info!(
1241 "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1242 self.config.node_name,
1243 node.name,
1244 node.host,
1245 node.port,
1246 self.config.group_name,
1247 );
1248
1249 // F9: if we are the current master, immediately register a
1250 // `Feeder` tracker for the new peer so AckTracker bookkeeping
1251 // and downstream pull-based streaming work without a forced
1252 // re-election.
1253 if self.is_master()
1254 && (node.node_type == crate::node_type::NodeType::Electable
1255 || node.node_type == crate::node_type::NodeType::Secondary)
1256 {
1257 let mut feeders = self.feeders.write();
1258 if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1259 feeders.push(Feeder::new(node.name.clone()));
1260 log::debug!(
1261 "Node '{}' (master): dispatched Feeder for new peer '{}'",
1262 self.config.node_name,
1263 node.name,
1264 );
1265 }
1266 }
1267 Ok(())
1268 }
1269
1270 /// Remove a peer node from the replication group by name.
1271 ///
1272 /// The node is deregistered from the `GroupService`. Elections initiated
1273 /// after this call will not include the removed node in quorum calculations.
1274 pub fn remove_peer(&self, name: &str) -> Result<()> {
1275 self.group_service.remove_node(name)?;
1276 log::info!(
1277 "Node '{}': removed peer '{}' from group '{}'",
1278 self.config.node_name,
1279 name,
1280 self.config.group_name,
1281 );
1282 Ok(())
1283 }
1284
1285 /// Update the capacity and latency metadata of an existing peer.
1286 ///
1287 /// Only the following fields are updated from `node`:
1288 /// - `read_capacity_pct`
1289 /// - `write_capacity_pct`
1290 /// - `latency_hint_ms`
1291 ///
1292 /// The node's identity (name, address, port, node_type) is preserved.
1293 /// Safe to call while replication is active.
1294 ///
1295 /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1296 /// is rebuilt to reflect the new capacity/latency weights.
1297 ///
1298 /// # Note
1299 ///
1300 /// `update_peer_metadata` does not currently re-run
1301 /// `QuorumPolicy::validate(electable_count)` after the metadata
1302 /// change. An LP-optimal `Expression` quorum that was safe before
1303 /// the update may no longer satisfy the intersection property
1304 /// afterwards. Until automatic revalidation lands, deployments
1305 /// using `QuorumPolicy::Expression` should call
1306 /// `quorum_policy().validate(get_rep_group().electable_count())`
1307 /// on the returned `RepGroup` after every metadata change and
1308 /// fail the operator-facing operation if validation reports
1309 /// unsafety.
1310 pub fn update_peer_metadata(
1311 &self,
1312 name: &str,
1313 node: crate::rep_node::RepNode,
1314 ) -> Result<()> {
1315 self.group_service.update_node_metadata(
1316 name,
1317 node.read_capacity_pct,
1318 node.write_capacity_pct,
1319 node.latency_hint_ms,
1320 )?;
1321 log::info!(
1322 "Node '{}': updated metadata for peer '{}' \
1323 (read_cap={}, write_cap={}, latency={}ms)",
1324 self.config.node_name,
1325 name,
1326 node.read_capacity_pct,
1327 node.write_capacity_pct,
1328 node.latency_hint_ms,
1329 );
1330 Ok(())
1331 }
1332
1333 /// Returns a snapshot of the current replication group as a `RepGroup`.
1334 ///
1335 /// The snapshot reflects the state at the time of the call; subsequent
1336 /// `add_peer` / `remove_peer` calls are not reflected in it.
1337 pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1338 use crate::rep_group::RepGroup;
1339
1340 let mut group = RepGroup::new(
1341 self.config.group_name.clone(),
1342 self.group_service.get_group_id(),
1343 );
1344 for info in self.group_service.get_all_nodes() {
1345 let mut node = crate::rep_node::RepNode::new(
1346 info.name.clone(),
1347 info.node_type,
1348 info.host.clone(),
1349 info.port,
1350 info.node_id,
1351 );
1352 node.read_capacity_pct = info.read_capacity_pct;
1353 node.write_capacity_pct = info.write_capacity_pct;
1354 node.latency_hint_ms = info.latency_hint_ms;
1355 group.add_node(node);
1356 }
1357 group
1358 }
1359
1360 /// Get the replication configuration.
1361 ///
1362 ///
1363 ///
1364 /// Returns the replication configuration that has been used to create this
1365 /// environment.
1366 pub fn get_config(&self) -> &RepConfig {
1367 &self.config
1368 }
1369
1370 /// Get the current VLSN range on this node.
1371 ///
1372 /// Returns the range of VLSNs currently available on this node.
1373 pub fn get_vlsn_range(&self) -> VlsnRange {
1374 self.vlsn_index.get_range()
1375 }
1376
1377 /// Get the latest VLSN.
1378 ///
1379 /// Returns the most recent VLSN registered on this node.
1380 pub fn get_current_vlsn(&self) -> u64 {
1381 self.vlsn_index.get_latest_vlsn()
1382 }
1383
1384 /// The replica-side replication stream state (master high-water, applied
1385 /// VLSN, lag). Used by the consistency read-gate to learn the master's
1386 /// latest known commit VLSN (JE `ConsistencyTracker.masterTxnEndVLSN`,
1387 /// updated by heartbeats).
1388 pub fn replica_stream(&self) -> &ReplicaStream {
1389 &self.replica_stream
1390 }
1391
1392 /// REP-10 (B): mint a [`CommitToken`] for the most recent commit on this
1393 /// master.
1394 ///
1395 /// Port of `MasterTxn.getCommitToken`: returns
1396 /// `new CommitToken(envUUID, commitVLSN.getSequence())`. A client that
1397 /// just performed a write on the master calls this to obtain the token it
1398 /// will hand to a subsequent replica read
1399 /// (`Transaction.getCommitToken`). Returns `None` on a non-master or when
1400 /// no commit VLSN exists yet (JE returns `null` when `commitVLSN.isNull`).
1401 ///
1402 /// The token's VLSN is the master's latest assigned VLSN — the same
1403 /// `wal_vlsn_counter` high-water the ack gate keys on (the commit was
1404 /// logged immediately before this call).
1405 pub fn commit_token(&self) -> Option<crate::CommitToken> {
1406 if !self.is_master() {
1407 return None;
1408 }
1409 let vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
1410 crate::CommitToken::new(self.config.group_name.clone(), vlsn)
1411 }
1412
1413 /// REP-10 (C): the read-gate. Enforce a replica read-consistency policy
1414 /// before a read transaction proceeds.
1415 ///
1416 /// Port of `ReplicaConsistencyPolicy.ensureConsistency` as invoked from a
1417 /// replica `beginTransaction` (`RepImpl.checkConsistency` /
1418 /// `Replica.getConsistencyTracker().awaitVLSN`). Called by the replica
1419 /// env's transaction-begin / read path.
1420 ///
1421 /// - `policy_override`: a per-transaction policy (JE
1422 /// `TransactionConfig.setConsistencyPolicy`). When `None`, the node's
1423 /// configured default is used (`ReplicationConfig.setConsistencyPolicy`
1424 /// — [`RepConfig::consistency_policy`]).
1425 ///
1426 /// On a master, or when the effective policy is
1427 /// [`ConsistencyPolicy::NoConsistency`], this returns immediately so
1428 /// existing behaviour is unchanged unless a policy is set. On a replica
1429 /// with a non-`NoConsistency` policy it BLOCKS until the replica has
1430 /// replayed far enough or the policy timeout expires (a clean
1431 /// [`RepError`], never a hang).
1432 pub fn begin_read_consistency(
1433 &self,
1434 policy_override: Option<&crate::ConsistencyPolicy>,
1435 ) -> Result<()> {
1436 // Resolve the effective policy: per-txn override else node default.
1437 let default_policy = self.config.consistency_policy.clone();
1438 let policy = policy_override.unwrap_or(&default_policy);
1439
1440 // NoConsistency never blocks (the master path also lands here).
1441 if matches!(policy, crate::ConsistencyPolicy::NoConsistency) {
1442 return Ok(());
1443 }
1444
1445 // A non-No policy only makes sense on a replica with a live replay
1446 // (its last_applied_vlsn is the wait predicate). Without a tracker
1447 // there is nothing to wait on — treat as immediately consistent
1448 // rather than block forever (e.g. on the master, which is by
1449 // definition fully current).
1450 let tracker = self.consistency_tracker.lock().unwrap().clone();
1451 let Some(tracker) = tracker else {
1452 return Ok(());
1453 };
1454
1455 // Surface the master's latest known VLSN for the time policy
1456 // (heartbeat / feeder high-water). JE ConsistencyTracker tracks this
1457 // via trackHeartbeat; here we read the replica_stream high-water.
1458 let master_vlsn = self.replica_stream.get_master_vlsn();
1459 if master_vlsn > 0 {
1460 tracker.set_master_vlsn(master_vlsn);
1461 }
1462
1463 tracker.await_consistency(policy)
1464 }
1465
1466 /// REP-10 (C) test seam: install a [`ConsistencyTracker`] over an existing
1467 /// `last_applied_vlsn` handle, exactly as `become_replica` does when it
1468 /// starts the live replay thread.
1469 ///
1470 /// Lets a test drive a real [`noxu_dbi::ReplicaReplay`] and exercise
1471 /// [`Self::begin_read_consistency`] end-to-end without standing up TCP
1472 /// feeder/receiver threads. Not part of the production API.
1473 #[cfg(any(test, feature = "test-harness"))]
1474 pub fn install_consistency_tracker_for_test(
1475 &self,
1476 last_applied_vlsn: std::sync::Arc<std::sync::atomic::AtomicU64>,
1477 ) -> crate::ConsistencyTracker {
1478 let tracker = crate::ConsistencyTracker::new(last_applied_vlsn);
1479 *self.consistency_tracker.lock().unwrap() = Some(tracker.clone());
1480 tracker
1481 }
1482
1483 /// REP-1 STEP 5 (D): run a live syncup against `feeder` and, if this
1484 /// replica's tail diverged, ROLL IT BACK to the common matchpoint instead
1485 /// of falling back to a network restore.
1486 ///
1487 /// Port of the replica's side of JE `ReplicaFeederSyncup.execute`:
1488 /// `findMatchpoint` → `verifyRollback` → `replay.rollback` →
1489 /// `vlsnIndex.truncateFromTail` → resume streaming from `matchpoint + 1`.
1490 ///
1491 /// `feeder` is the master's [`crate::stream::syncup::SyncupView`] (built
1492 /// from its VLSN index, or exchanged over the syncup wire protocol in
1493 /// [`crate::stream::syncup_protocol`]). The decision uses the same pure
1494 /// core the protocol drives: `find_matchpoint` + `verify_rollback`.
1495 ///
1496 /// Returns:
1497 /// - [`SyncupAction::RolledBack`] — the divergent tail was truncated to
1498 /// the matchpoint; resume streaming from `start_vlsn`. The non-diverged
1499 /// case (matchpoint == last VLSN) returns `RolledBack` with an empty
1500 /// tail and is a no-op rollback.
1501 /// - [`SyncupAction::NeedsRestore`] — `verify_rollback` selected
1502 /// NetworkRestore (no common matchpoint) or HardRecovery (the rollback
1503 /// would cross a committed/aborted txn); the caller must network-restore
1504 /// per JE.
1505 ///
1506 /// The non-diverged fast path (the replica's range is a prefix of the
1507 /// feeder's) is still served by the range-check `negotiate_syncup`
1508 /// (`SyncupResult::CanServe`) in the streaming path; this method is the
1509 /// DIVERGED case.
1510 pub fn syncup_with_feeder(
1511 &self,
1512 feeder: &dyn crate::stream::syncup::SyncupView,
1513 ) -> Result<SyncupAction> {
1514 // Build the replica's SyncupView. When a real LogManager is wired,
1515 // re-read the log (SyncupLogView) so the per-VLSN fingerprint is the
1516 // actual record checksum (JE ReplicaSyncupReader). Otherwise (the
1517 // VLSN-index-only harness model) fall back to the index view, whose
1518 // fingerprint is the LSN.
1519 let log_view: Option<crate::stream::syncup_reader::SyncupLogView> =
1520 self.env_impl.lock().unwrap().clone().and_then(|env| {
1521 if let Some(lm) = env.get_log_manager() {
1522 // Flush so all VLSN-tagged entries are on disk before the
1523 // backward re-read (JE flushNoSync in initScan).
1524 let _ = lm.flush_sync();
1525 }
1526 crate::stream::syncup_reader::SyncupLogView::scan(
1527 env.get_env_home(),
1528 )
1529 });
1530 let index_view = VlsnIndexView::from_index(&self.vlsn_index);
1531 let replica_view: &dyn crate::stream::syncup::SyncupView =
1532 match &log_view {
1533 Some(v) => v,
1534 None => &index_view,
1535 };
1536
1537 let range = self.vlsn_index.get_range();
1538 let last_sync = range.get_last_sync();
1539 let last_txn_end = range.get_last_txn_end();
1540 let to_vlsn = |v: u64| {
1541 if v == 0 {
1542 noxu_util::NULL_VLSN
1543 } else {
1544 noxu_util::Vlsn::new(v as i64)
1545 }
1546 };
1547
1548 // Step 1: find the matchpoint (JE findMatchpoint).
1549 let matchpoint = find_matchpoint(replica_view, feeder);
1550
1551 // numPassedCommits: count of txn ends strictly above the matchpoint.
1552 // When we re-read the log, count them exactly; otherwise rely on the
1553 // numeric `lastTxnEnd <= matchpoint` test in verify_rollback (which
1554 // matches JE when sync points == txn ends).
1555 let num_passed_commits = match (&log_view, &matchpoint) {
1556 (Some(v), Matchpoint::Found { vlsn, .. }) => {
1557 v.num_passed_commits(*vlsn)
1558 }
1559 _ => 0,
1560 };
1561 let decision = verify_rollback(
1562 &matchpoint,
1563 to_vlsn(last_txn_end),
1564 to_vlsn(last_sync),
1565 num_passed_commits,
1566 );
1567
1568 match decision {
1569 RollbackDecision::RollbackToMatchpoint {
1570 matchpoint_vlsn,
1571 start_vlsn,
1572 } => {
1573 let matchpoint_lsn = match &matchpoint {
1574 Matchpoint::Found { lsn, .. } => *lsn,
1575 Matchpoint::None => 0,
1576 };
1577 // Collect the rolled-back LSNs (VLSNs strictly above the
1578 // matchpoint). When the real log was re-read, use its EXACT
1579 // per-VLSN LSNs so make-invisible flips the right header bytes
1580 // (the sparse VLSN index only stores boundary/last LSNs).
1581 let mp = matchpoint_vlsn.sequence().max(0) as u64;
1582 let rollback_lsns: Vec<noxu_util::Lsn> = match &log_view {
1583 Some(v) => v
1584 .entries()
1585 .filter(|(vlsn, _)| (vlsn.sequence() as u64) > mp)
1586 .map(|(_, e)| noxu_util::Lsn::from_u64(e.lsn))
1587 .collect(),
1588 None => self
1589 .vlsn_index
1590 .snapshot_entries()
1591 .into_iter()
1592 .filter(|(vlsn, _, _)| *vlsn > mp)
1593 .map(|(_, file, offset)| {
1594 noxu_util::Lsn::new(file, offset)
1595 })
1596 .collect(),
1597 };
1598 self.execute_rollback(mp, matchpoint_lsn, &rollback_lsns)?;
1599 Ok(SyncupAction::RolledBack {
1600 matchpoint_vlsn: mp,
1601 start_vlsn: start_vlsn.sequence().max(0) as u64,
1602 })
1603 }
1604 RollbackDecision::HardRecovery { .. }
1605 | RollbackDecision::NetworkRestore => {
1606 Ok(SyncupAction::NeedsRestore)
1607 }
1608 }
1609 }
1610
1611 /// Execute the durable + in-memory rollback to `matchpoint_vlsn`
1612 /// (LSN `matchpoint_lsn`). Port of JE `Replay.rollback` +
1613 /// `vlsnIndex.truncateFromTail`.
1614 ///
1615 /// Durable steps (RollbackStart/End + make-invisible + fsync) go through
1616 /// [`noxu_recovery::rollback`] when a `LogManager` is wired; the VLSN index
1617 /// is always truncated to the matchpoint so the reported range matches the
1618 /// rolled-back state and streaming resumes from `matchpoint + 1`.
1619 fn execute_rollback(
1620 &self,
1621 matchpoint_vlsn: u64,
1622 matchpoint_lsn: u64,
1623 rollback_lsns: &[noxu_util::Lsn],
1624 ) -> Result<()> {
1625 // Durable rollback (RollbackStart … make-invisible … RollbackEnd) when
1626 // a live LogManager is available. The harness-level env (VLSN-index
1627 // only, no LogManager) skips the on-disk steps; the index truncation
1628 // below is what makes the replica converge in that model.
1629 if let Some(env) = self.env_impl.lock().unwrap().clone()
1630 && let Some(log_mgr) = env.get_log_manager()
1631 && matchpoint_lsn != 0
1632 {
1633 let mp_lsn = noxu_util::Lsn::from_u64(matchpoint_lsn);
1634 // active_txn_ids: the harness/VLSN-index model has no live txn
1635 // table here; the durable RollbackStart records an empty set, and
1636 // the per-txn gating (REP-1 STEP 2) applies during recovery when
1637 // the analysis pass rebuilds the active set. A future pass can
1638 // thread the live ReplayTxn ids through (JE
1639 // localActiveTxns.keySet()).
1640 noxu_recovery::rollback(
1641 &log_mgr,
1642 noxu_util::Vlsn::new(matchpoint_vlsn as i64),
1643 mp_lsn,
1644 Vec::new(),
1645 rollback_lsns,
1646 )
1647 .map_err(|e| {
1648 RepError::DatabaseError(format!(
1649 "live rollback to matchpoint failed: {e}"
1650 ))
1651 })?;
1652 }
1653
1654 // JE vlsnIndex.truncateFromTail(startVLSN, matchpointLSN): drop the
1655 // divergent VLSN tail so the reported range matches the recovered
1656 // state and streaming resumes from matchpoint + 1.
1657 self.vlsn_index.truncate_after(matchpoint_vlsn);
1658
1659 log::info!(
1660 "Node '{}': live syncup rolled back to matchpoint vlsn={} \
1661 (lsn={:#x}); {} tail entries truncated",
1662 self.config.node_name,
1663 matchpoint_vlsn,
1664 matchpoint_lsn,
1665 rollback_lsns.len(),
1666 );
1667 Ok(())
1668 }
1669
1670 /// Test-only: clone the env's SHARED VLSN index `Arc`.
1671 ///
1672 /// REP-6: the replica receive loop (`become_replica` ->
1673 /// `EnvironmentLogWriter`) must feed THIS index — the one
1674 /// `get_vlsn_range`, `flush_to_disk`, and election ranking read — not a
1675 /// throwaway. Tests use this to build a writer the same way
1676 /// `become_replica` does and assert the shared index advances.
1677 #[cfg(feature = "test-harness")]
1678 pub fn vlsn_index_arc(&self) -> Arc<crate::vlsn::vlsn_index::VlsnIndex> {
1679 Arc::clone(&self.vlsn_index)
1680 }
1681
1682 /// Return the list of replica names that currently have a `Feeder`
1683 /// tracker on this (master) node.
1684 ///
1685 /// Used by tests and operator tooling. The returned list reflects
1686 /// the master's view at the time of the call; subsequent
1687 /// `add_peer`/`remove_peer` calls may change it.
1688 pub fn feeder_replica_names(&self) -> Vec<String> {
1689 self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1690 }
1691
1692 // -----------------------------------------------------------------------
1693 // C-C2 — active push feeder API
1694 // -----------------------------------------------------------------------
1695
1696 /// Register a channel for pushing log entries to a specific replica.
1697 ///
1698 /// When [`Self::become_master`] is called — or if the node is **already
1699 /// master** — a [`FeederRunner`] background thread is immediately spawned
1700 /// for this channel. The thread reads from a dedicated in-memory queue
1701 /// that is fed by [`Self::replicate_entry`] / [`Self::apply_entry`], and
1702 /// sends framed log entries to the replica over `channel`. Acks sent
1703 /// back by the replica are visible via
1704 /// [`Self::active_feeder_runner_acked_vlsn`].
1705 ///
1706 /// # Production vs. test use
1707 ///
1708 /// *Production*: pass a [`crate::net::TcpChannel`] connected to the
1709 /// replica's inbound feeder service.
1710 /// *Tests*: pass one half of a [`crate::net::LocalChannelPair`].
1711 ///
1712 /// # Note on push vs. pull
1713 ///
1714 /// Registering a channel activates the **push** path: the master
1715 /// initiates and owns the feeder connection. The existing **pull** path
1716 /// (`PeerFeederService` / `catch_up_from_peer`) continues to operate in
1717 /// parallel for replicas that connect proactively. Do not register a
1718 /// channel for a replica that already connects via the pull path, or
1719 /// entries may be delivered twice.
1720 ///
1721 /// If `become_master` was called *before* registering the channel, call
1722 /// this method afterward; it will spawn the FeederRunner immediately.
1723 pub fn register_feeder_channel(
1724 &self,
1725 replica_name: String,
1726 channel: Arc<dyn crate::net::Channel>,
1727 ) {
1728 {
1729 let mut ch = self.feeder_channels.lock().unwrap();
1730 ch.insert(replica_name.clone(), Arc::clone(&channel));
1731 }
1732 if self.is_master() {
1733 self.spawn_feeder_runner(replica_name, channel);
1734 }
1735 }
1736
1737 /// Return the last VLSN acknowledged by the FeederRunner for `replica_name`.
1738 ///
1739 /// Returns `0` if no FeederRunner is currently active for that replica
1740 /// (either `become_master` was not called yet, or no channel was
1741 /// registered). Use this to poll catch-up progress before shutdown.
1742 pub fn active_feeder_runner_acked_vlsn(&self, replica_name: &str) -> u64 {
1743 self.active_feeder_runners
1744 .lock()
1745 .unwrap()
1746 .get(replica_name)
1747 .map(|r| r.known_replica_vlsn())
1748 .unwrap_or(0)
1749 }
1750
1751 /// Spawn a FeederRunner thread for `replica_name` using `channel`.
1752 ///
1753 /// Creates a dedicated `PeerLogScanner` queue for the replica, registers
1754 /// it in `feeder_queues` so that future `replicate_entry` / `apply_entry`
1755 /// calls fan out into it, spawns the `FeederRunner::run` loop, and
1756 /// records the `Arc<FeederRunner>` in `active_feeder_runners`.
1757 ///
1758 /// Idempotent: if a FeederRunner is already active for `replica_name`
1759 /// (from a prior `become_master` call), it is replaced — the old channel
1760 /// should have been closed already via `close()`.
1761 ///
1762 /// **WAL-scanner auto-feed path (C-C2b)**: when a live `EnvironmentImpl`
1763 /// has been wired via `with_environment`, the FeederRunner thread uses an
1764 /// `EnvironmentLogScanner` as its source. Every `log_txn_commit` on the
1765 /// master writes a VLSN-tagged WAL entry (22-byte header); the scanner
1766 /// finds these entries and streams them to the replica automatically,
1767 /// without any `replicate_entry` call from the application.
1768 ///
1769 /// **Fallback path**: when no `EnvironmentImpl` is wired the runner reads
1770 /// from the in-memory `PeerLogScanner` queue populated by
1771 /// `replicate_entry` / `apply_entry` — the previous manual behaviour.
1772 fn spawn_feeder_runner(
1773 &self,
1774 replica_name: String,
1775 channel: Arc<dyn crate::net::Channel>,
1776 ) {
1777 // Dedicated entry queue: entries flowing from this master reach the
1778 // FeederRunner without competing with PeerFeederService.
1779 let queue = Arc::new(PeerLogScanner::new());
1780 {
1781 self.feeder_queues
1782 .write()
1783 .unwrap()
1784 .insert(replica_name.clone(), Arc::clone(&queue));
1785 }
1786
1787 // REP-9 Part 1: wire an ack sink so the FeederRunner forwards every
1788 // inbound replica ack to `env.record_ack(vlsn, replica_name)`, which
1789 // reaches BOTH the AckTracker (commit-blocking quorum) and the
1790 // matching `Feeder::acked_vlsn` (DTVLSN ranking). Without this the
1791 // ack reached only the runner's private `known_replica_vlsn`. The
1792 // sink holds a `Weak<Self>` so it never extends the env's lifetime;
1793 // if `self_weak` was never initialised we fall back to the plain
1794 // (sink-less) runner — `record_ack` is still reachable from tests.
1795 let runner = match self.self_weak.get().and_then(Weak::upgrade) {
1796 Some(env_arc) => {
1797 let weak = Arc::downgrade(&env_arc);
1798 let sink: crate::stream::feeder::AckSink =
1799 Arc::new(move |name: &str, vlsn: u64| {
1800 if let Some(env) = weak.upgrade() {
1801 env.record_ack(vlsn, name);
1802 }
1803 });
1804 Arc::new(FeederRunner::new_with_ack_sink(
1805 Arc::clone(&channel),
1806 1,
1807 replica_name.clone(),
1808 sink,
1809 ))
1810 }
1811 None => Arc::new(FeederRunner::new(Arc::clone(&channel), 1)),
1812 };
1813 let runner_clone = Arc::clone(&runner);
1814 let replica_clone = replica_name.clone();
1815
1816 // C-C2b: prefer EnvironmentLogScanner (WAL auto-feed) when env is
1817 // wired; fall back to in-memory queue (manual replicate_entry path)
1818 // otherwise.
1819 let env_opt = self.env_impl.lock().unwrap().clone();
1820
1821 let handle = std::thread::Builder::new()
1822 .name(format!("noxu-feeder-{}", replica_name))
1823 .spawn(move || {
1824 if let Some(env) = env_opt {
1825 if let Some(mut scanner) =
1826 EnvironmentLogScanner::new(&env, None)
1827 {
1828 log::info!(
1829 "FeederRunner for replica '{}': using \
1830 EnvironmentLogScanner (WAL auto-feed)",
1831 replica_clone,
1832 );
1833 let _ = runner_clone.run(&mut scanner);
1834 } else {
1835 log::warn!(
1836 "FeederRunner for replica '{}': \
1837 EnvironmentLogScanner unavailable, \
1838 falling back to in-memory queue",
1839 replica_clone,
1840 );
1841 let mut source = PeerScannerAdapter::new(queue, 0);
1842 let _ = runner_clone.run(&mut source);
1843 }
1844 } else {
1845 let mut source = PeerScannerAdapter::new(queue, 0);
1846 let _ = runner_clone.run(&mut source);
1847 }
1848 log::debug!(
1849 "FeederRunner for replica '{}' exited cleanly",
1850 replica_clone
1851 );
1852 })
1853 .expect("failed to spawn FeederRunner thread");
1854
1855 {
1856 let mut runners = self.active_feeder_runners.lock().unwrap();
1857 runners.insert(replica_name.clone(), Arc::clone(&runner));
1858 }
1859 self.io_threads.lock().unwrap().push(handle);
1860
1861 log::info!(
1862 "Node '{}' (master): FeederRunner thread spawned for replica '{}'",
1863 self.config.node_name.as_str(),
1864 replica_name,
1865 );
1866 }
1867
1868 // -----------------------------------------------------------------------
1869
1870 /// Bootstrap this node's environment by network-restoring all `.ndb`
1871 /// files from `peer_name` via the dispatcher's RESTORE service.
1872 ///
1873 /// Closes findings F2 / F4 of the 2026 review.
1874 ///
1875 /// The standalone `NetworkRestore::execute()` opens raw TCP and
1876 /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1877 /// Production replicated environments host the RESTORE handler on the
1878 /// dispatcher, so this method routes through `execute_via_dispatcher`.
1879 ///
1880 /// `peer_name` must be a known peer in `GroupService`; on success the
1881 /// peer's `.ndb` files are written into `config.env_home`. Returns
1882 /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1883 /// fails for any reason.
1884 pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1885 let env_home = self.config.env_home.clone().ok_or_else(|| {
1886 RepError::ConfigError(
1887 "bootstrap_via_dispatcher requires env_home in RepConfig"
1888 .into(),
1889 )
1890 })?;
1891 let peer_info = self
1892 .group_service
1893 .get_all_nodes()
1894 .into_iter()
1895 .find(|n| n.name == peer_name)
1896 .ok_or_else(|| {
1897 RepError::ConfigError(format!(
1898 "peer '{}' not registered in group '{}'",
1899 peer_name, self.config.group_name,
1900 ))
1901 })?;
1902
1903 let cfg = NetworkRestoreConfig {
1904 source_node: peer_info.name.clone(),
1905 source_host: peer_info.host.clone(),
1906 source_port: peer_info.port,
1907 retain_log_files: true,
1908 };
1909 let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1910 restore.execute_via_dispatcher()?;
1911 log::info!(
1912 "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1913 self.config.node_name,
1914 peer_info.name,
1915 peer_info.host,
1916 peer_info.port,
1917 );
1918 Ok(())
1919 }
1920
1921 /// Get replication statistics.
1922 ///
1923 ///
1924 ///
1925 /// Returns statistics associated with this environment.
1926 pub fn get_stats(&self) -> &RepStats {
1927 &self.stats
1928 }
1929
1930 /// Get the ack tracker.
1931 pub fn get_ack_tracker(&self) -> &AckTracker {
1932 &self.ack_tracker
1933 }
1934
1935 /// Ensure the node state machine is in Unknown state, transitioning
1936 /// from Detached if necessary. This is needed because the state machine
1937 /// only allows Detached -> Unknown -> Master/Replica.
1938 pub fn ensure_unknown_state(&self) -> Result<()> {
1939 let current = self.node_state.get_state();
1940 match current {
1941 NodeState::Unknown => Ok(()),
1942 NodeState::Detached => {
1943 self.node_state.transition_to(NodeState::Unknown)?;
1944 Ok(())
1945 }
1946 // Master and Replica must transition through Unknown before
1947 // joining a new group or reconnecting.
1948 NodeState::Master | NodeState::Replica => {
1949 self.node_state.transition_to(NodeState::Unknown)?;
1950 Ok(())
1951 }
1952 NodeState::Shutdown => {
1953 Err(RepError::StateError("Node is shut down".to_string()))
1954 }
1955 }
1956 }
1957
1958 /// Transition to master state.
1959 ///
1960 /// Transitions this node to Master state for the given election term.
1961 /// As master, the node can accept write operations and feed log entries
1962 /// to replicas.
1963 ///
1964 /// **Active push-feeder** (C-C2): if feeder channels have been registered
1965 /// via [`Self::register_feeder_channel`] before this call, a
1966 /// [`FeederRunner`] background thread is spawned per channel.
1967 ///
1968 /// **WAL-scanner auto-feed path (C-C2b, v3.3.0)**: when
1969 /// [`Self::with_environment`] has been called before `become_master`,
1970 /// each `FeederRunner` thread uses an [`EnvironmentLogScanner`] as its
1971 /// source. Every `log_txn_commit` on the master writes a VLSN-tagged
1972 /// 22-byte WAL entry (via `LogManager::log_with_vlsn`); the scanner
1973 /// discovers these entries and streams them to replicas automatically,
1974 /// without any [`Self::replicate_entry`] call from the application.
1975 ///
1976 /// **Fallback path**: when no `EnvironmentImpl` is wired, the runner
1977 /// reads from the in-memory queue populated by [`Self::replicate_entry`] /
1978 /// [`Self::apply_entry`].
1979 ///
1980 /// If no feeder channels are registered, this call registers per-replica
1981 /// `Feeder` tracker structs for `AckTracker` bookkeeping only. In that
1982 /// case replicas must connect proactively to the `PEER_FEEDER` pull
1983 /// service to receive entries.
1984 pub fn become_master(&self, term: u64) -> Result<()> {
1985 if self.is_shutdown() {
1986 return Err(RepError::StateError(
1987 "Cannot become master: environment is closed".to_string(),
1988 ));
1989 }
1990
1991 // JE invariant: only `Electable` nodes can become master. `Secondary`,
1992 // `Monitor`, and `Arbiter` are not electable and must be rejected at
1993 // the API layer (mirrors JE `ExceptionTest`). See
1994 // `NodeType::can_be_master`.
1995 if !self.config.node_type.can_be_master() {
1996 return Err(RepError::InvalidStateTransition(format!(
1997 "node '{}' has type {} which is not electable as master",
1998 self.config.node_name.as_str(),
1999 self.config.node_type,
2000 )));
2001 }
2002
2003 // Ensure we can reach Master state (may need Detached -> Unknown first)
2004 self.ensure_unknown_state()?;
2005
2006 let old_state = self.node_state.get_state();
2007 self.node_state.transition_to(NodeState::Master)?;
2008 self.master_tracker.set_master(self.config.node_name.as_str(), term);
2009
2010 // --- F9: spawn Feeder trackers for each known replica -------------
2011 //
2012 // Closes finding F9 of the 2026 review.
2013 // The architecture is pull-based: replicas pull from the master's
2014 // `PEER_FEEDER` service via `catch_up_from_peer`. However, the
2015 // master must:
2016 // 1. Track each replica via a `Feeder` so AckTracker bookkeeping
2017 // can attribute replica acks to the right node.
2018 // 2. Push its own writes into `peer_scanner` so replicas pulling
2019 // from `PEER_FEEDER` actually receive entries (`replicate_entry`).
2020 //
2021 // Here we ensure step 1: every known electable peer in the group
2022 // gets a `Feeder` entry.
2023 {
2024 let mut feeders = self.feeders.write();
2025 // Drop any stale feeders left over from a prior role. A
2026 // `Feeder` is just an in-memory tracker; recreating it is
2027 // cheap and avoids state inversion bugs across role changes.
2028 feeders.clear();
2029 for peer in self.group_service.get_all_nodes() {
2030 if peer.name == self.config.node_name {
2031 continue;
2032 }
2033 if peer.node_type != crate::node_type::NodeType::Electable
2034 && peer.node_type != crate::node_type::NodeType::Secondary
2035 {
2036 // Arbiters do not receive log entries.
2037 continue;
2038 }
2039 feeders.push(Feeder::new(peer.name.clone()));
2040 log::debug!(
2041 "Node '{}' (master, term={}): registered Feeder for \
2042 replica '{}'",
2043 self.config.node_name.as_str(),
2044 term,
2045 peer.name,
2046 );
2047 }
2048 }
2049
2050 // For observability, log the count.
2051 log::info!(
2052 "Node '{}' became master for term {} \
2053 (feeder trackers: {} known replicas)",
2054 self.config.node_name.as_str(),
2055 term,
2056 self.feeders.read().len(),
2057 );
2058
2059 // C-C2: spawn FeederRunner threads for pre-registered channels.
2060 //
2061 // When `register_feeder_channel` was called before `become_master`,
2062 // the channels are already in `feeder_channels`. Drain them and
2063 // spawn a FeederRunner per replica. The FeederRunner reads from a
2064 // dedicated `PeerLogScanner` queue (populated by `replicate_entry`
2065 // fan-out) and pushes framed log entries to the replica over the
2066 // registered channel. Acks from the replica are tracked in the
2067 // FeederRunner and visible via `active_feeder_runner_acked_vlsn`.
2068 {
2069 let channels: Vec<(String, Arc<dyn crate::net::Channel>)> = self
2070 .feeder_channels
2071 .lock()
2072 .unwrap()
2073 .iter()
2074 .map(|(k, v)| (k.clone(), Arc::clone(v)))
2075 .collect();
2076 for (replica_name, channel) in channels {
2077 self.spawn_feeder_runner(replica_name, channel);
2078 }
2079 }
2080
2081 // -------------------------------------------------------------------
2082
2083 // Notify listeners
2084 self.notify_listeners(old_state, NodeState::Master);
2085
2086 Ok(())
2087 }
2088
2089 /// Transition to replica state with the given master.
2090 ///
2091 /// Transitions this node to Replica state. The node will receive log
2092 /// entries from the specified master.
2093 ///
2094 /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
2095 /// the method prepares an `EnvironmentLogWriter` so that replicated
2096 /// entries can be written to the local log. The actual network connection
2097 /// is established by the `TcpServiceDispatcher`; this method logs intent.
2098 ///
2099 /// In HA.
2100 pub fn become_replica(&self, master_name: &str) -> Result<()> {
2101 if self.is_shutdown() {
2102 return Err(RepError::StateError(
2103 "Cannot become replica: environment is closed".to_string(),
2104 ));
2105 }
2106
2107 // Ensure we can reach Replica state (may need Detached -> Unknown first)
2108 self.ensure_unknown_state()?;
2109
2110 let old_state = self.node_state.get_state();
2111 self.node_state.transition_to(NodeState::Replica)?;
2112 self.master_tracker.set_master(master_name, 0);
2113 self.replica_stream.set_master(master_name);
2114 self.replica_stream.set_state(
2115 crate::stream::replica_stream::ReplicaStreamState::Connecting,
2116 );
2117
2118 // --- G19: start replica receive loop --------------------------------
2119 //
2120 // Connects to the master's PEER_FEEDER service and runs a
2121 // ReplicaReceiver loop in a background thread. The receiver writes
2122 // replicated entries via EnvironmentLogWriter.
2123 if let Some(env) = self.env_impl.lock().unwrap().clone() {
2124 if let Some(log_mgr) = env.get_log_manager() {
2125 // REP-6: feed the env's SHARED, persisted VLSN index (the one
2126 // flush_to_disk persists and get_vlsn_range / election ranking
2127 // read) into the replica receive loop — NOT a throwaway. Using
2128 // a fresh index would leave the persisted vlsn.idx, the
2129 // reported VLSN range, and the DTVLSN-ranking own_vlsn lagging
2130 // the actually-received stream, widening catch-up (or forcing
2131 // an unnecessary network restore) after a clean restart.
2132 // JE: the replica's VLSNIndex IS the environment's persisted
2133 // index (see VLSNIndex).
2134 let vlsn_index = Arc::clone(&self.vlsn_index);
2135
2136 // Resolve the master's socket address from the GroupService.
2137 let master_addr_opt: Option<SocketAddr> = self
2138 .group_service
2139 .get_all_nodes()
2140 .iter()
2141 .find(|n| n.name == master_name)
2142 .and_then(|info| {
2143 format!("{}:{}", info.host, info.port)
2144 .parse::<SocketAddr>()
2145 .ok()
2146 });
2147
2148 let node_name = self.config.node_name.clone();
2149 let master = master_name.to_string();
2150 let vlsn_index_clone = Arc::clone(&vlsn_index);
2151 let shutdown_flag = self.io_shutdown.load(Ordering::SeqCst);
2152 // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
2153 // can call `bootstrap_via_dispatcher` automatically when
2154 // the master signals `NeedsRestore`. When the env was
2155 // never registered with `init_self_weak` (raw
2156 // `Arc::new(Self::new(...))` without going through
2157 // `open()` or the test harness), the weak ref is `None`
2158 // and we fall back to operator-driven bootstrap.
2159 let self_weak: Option<Weak<Self>> =
2160 self.self_weak.get().cloned();
2161
2162 // REP-7 (B): clone the live EnvironmentImpl into the replica
2163 // thread so the writer can drive a ReplicaReplay that applies
2164 // each streamed entry to the live in-memory tree.
2165 let env_for_replay = Arc::clone(&env);
2166
2167 // REP-10 (C): build the ReplicaReplay HERE (not inside the
2168 // closure) so we can publish its REP-7 `last_applied_vlsn`
2169 // handle to a ConsistencyTracker BEFORE the thread starts
2170 // streaming. A read on this replica then waits on the same
2171 // handle the replay thread advances. Port of
2172 // RepImpl.getConsistency / Replica.getConsistencyTracker.
2173 let replay = noxu_dbi::ReplicaReplay::new(env_for_replay);
2174 let tracker = crate::ConsistencyTracker::new(
2175 replay.last_applied_vlsn_handle(),
2176 );
2177 *self.consistency_tracker.lock().unwrap() = Some(tracker);
2178
2179 let handle = std::thread::Builder::new()
2180 .name(format!("noxu-replica-{}", node_name))
2181 .spawn(move || {
2182 // REP-7 (B): wire the live replay-apply path so reads
2183 // on the replica see replicated data without a
2184 // restart. JE: the replica writes each entry to its
2185 // log, then Replay.replayEntry applies it to the tree.
2186 let mut writer = EnvironmentLogWriter::with_replay(
2187 log_mgr,
2188 vlsn_index_clone,
2189 replay,
2190 );
2191
2192 let Some(addr) = master_addr_opt else {
2193 log::warn!(
2194 "noxu-replica-{}: master '{}' address not in RepGroup; \
2195 waiting for TCP dispatcher connection",
2196 node_name, master,
2197 );
2198 return;
2199 };
2200
2201 // Catch-up loop: catch up, observe NeedsRestore,
2202 // optionally auto-bootstrap, retry once. We cap
2203 // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
2204 // (small) so a misbehaving master does not loop
2205 // forever consuming network bandwidth.
2206 const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
2207 let mut attempts: u32 = 0;
2208 loop {
2209 log::info!(
2210 "noxu-replica-{}: connecting to master '{}' at {}",
2211 node_name, master, addr,
2212 );
2213 match crate::stream::peer_feeder::catch_up_from_peer(
2214 addr, 0, &mut writer,
2215 ) {
2216 Ok(true) => {
2217 log::info!(
2218 "noxu-replica-{}: catch-up complete from '{}'",
2219 node_name, master,
2220 );
2221 return;
2222 }
2223 Ok(false) => {
2224 // F2/F4: master signals NeedsRestore.
2225 // Wave 9-A fix 2: if a Weak<Self> was
2226 // plumbed in, upgrade it and call
2227 // `bootstrap_via_dispatcher` ourselves
2228 // so the replica auto-bootstraps and
2229 // resumes catch-up without operator
2230 // intervention.
2231 log::warn!(
2232 "noxu-replica-{}: master '{}' requires restore",
2233 node_name, master,
2234 );
2235 attempts += 1;
2236 if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
2237 log::error!(
2238 "noxu-replica-{}: exceeded \
2239 auto-bootstrap attempts ({}); giving up",
2240 node_name,
2241 MAX_AUTO_BOOTSTRAP_ATTEMPTS,
2242 );
2243 return;
2244 }
2245 let env_arc = match self_weak
2246 .as_ref()
2247 .and_then(Weak::upgrade)
2248 {
2249 Some(e) => e,
2250 None => {
2251 // No back-ref or env dropped:
2252 // fall back to operator-driven
2253 // bootstrap and exit cleanly.
2254 log::warn!(
2255 "noxu-replica-{}: no back-reference \
2256 available; operator must call \
2257 bootstrap_via_dispatcher manually",
2258 node_name,
2259 );
2260 return;
2261 }
2262 };
2263 if env_arc.is_shutdown() {
2264 return;
2265 }
2266 log::info!(
2267 "noxu-replica-{}: auto-bootstrapping via \
2268 dispatcher from '{}' (attempt {})",
2269 node_name, master, attempts,
2270 );
2271 match env_arc
2272 .bootstrap_via_dispatcher(&master)
2273 {
2274 Ok(()) => {
2275 log::info!(
2276 "noxu-replica-{}: auto-bootstrap \
2277 succeeded; resuming catch-up",
2278 node_name,
2279 );
2280 // Drop the strong ref before
2281 // re-entering catch-up so we
2282 // do not keep the env alive
2283 // longer than necessary.
2284 drop(env_arc);
2285 continue;
2286 }
2287 Err(e) => {
2288 log::error!(
2289 "noxu-replica-{}: auto-bootstrap \
2290 failed: {}",
2291 node_name, e,
2292 );
2293 return;
2294 }
2295 }
2296 }
2297 Err(e) => {
2298 if !shutdown_flag {
2299 log::error!(
2300 "noxu-replica-{}: error from master '{}': {e}",
2301 node_name, master,
2302 );
2303 }
2304 return;
2305 }
2306 }
2307 }
2308 })
2309 .expect("failed to spawn noxu-replica thread");
2310
2311 self.io_threads.lock().unwrap().push(handle);
2312
2313 log::debug!(
2314 "Node '{}': replica receive thread started for master '{}'",
2315 self.config.node_name.as_str(),
2316 master_name,
2317 );
2318 } else {
2319 log::warn!(
2320 "Node '{}': no LogManager available (read-only env?); \
2321 replica I/O loop not started",
2322 self.config.node_name.as_str(),
2323 );
2324 }
2325 }
2326 // -------------------------------------------------------------------
2327
2328 // Notify listeners
2329 self.notify_listeners(old_state, NodeState::Replica);
2330
2331 log::info!(
2332 "Node '{}' became replica of master '{}'",
2333 self.config.node_name.as_str(),
2334 master_name
2335 );
2336 Ok(())
2337 }
2338
2339 /// Initiate a master transfer to the target node.
2340 ///
2341 ///
2342 ///
2343 /// Transfers the current master state from this node to one of the
2344 /// electable replicas. The replica that is actually chosen to be the new
2345 /// master is the one with which the Master Transfer can be completed most
2346 /// rapidly. The transfer operation ensures that all changes at this node
2347 /// are available at the new master upon conclusion of the operation.
2348 pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
2349 if self.is_shutdown() {
2350 return Err(RepError::StateError(
2351 "Cannot transfer master: environment is closed".to_string(),
2352 ));
2353 }
2354
2355 if !self.is_master() {
2356 return Err(RepError::InvalidState(
2357 "Master transfer can only be initiated on the master node"
2358 .to_string(),
2359 ));
2360 }
2361
2362 log::info!(
2363 "Node '{}' initiating master transfer to '{}'",
2364 self.config.node_name.as_str(),
2365 config.target_node,
2366 );
2367
2368 // Closes finding F7 of the 2026 review.
2369 //
2370 // Steps:
2371 // 1. Locate the target's address.
2372 // 2. Compute the new term (current observed term + 1).
2373 // 3. Send TRANSFER_MASTER to the target — it will become master.
2374 // 4. Send TRANSFER_MASTER (with the same term + new master name) to
2375 // every other peer so they re-target.
2376 // 5. Demote self to Replica of the target.
2377 //
2378 // The transfer is best-effort: a peer that doesn't ack is logged
2379 // and skipped. The election driver will reconcile any divergence
2380 // on the next election round.
2381
2382 let target_addr = self
2383 .group_service
2384 .get_all_nodes()
2385 .into_iter()
2386 .find(|n| n.name == config.target_node)
2387 .and_then(|n| {
2388 format!("{}:{}", n.host, n.port)
2389 .parse::<std::net::SocketAddr>()
2390 .ok()
2391 })
2392 .ok_or_else(|| {
2393 RepError::ConfigError(format!(
2394 "transfer_master: target '{}' not registered or has bad address",
2395 config.target_node
2396 ))
2397 })?;
2398
2399 let new_term = self.master_tracker.get_term().saturating_add(1);
2400
2401 // 1. Tell the target to become master at the new term.
2402 let target_ack = crate::group_admin::send_transfer_master(
2403 target_addr,
2404 &config.target_node,
2405 new_term,
2406 )
2407 .map_err(|e| {
2408 RepError::NetworkError(format!(
2409 "transfer_master: failed to signal target '{}': {}",
2410 config.target_node, e
2411 ))
2412 })?;
2413 if !target_ack {
2414 return Err(RepError::StateError(format!(
2415 "transfer_master: target '{}' rejected the transfer",
2416 config.target_node
2417 )));
2418 }
2419
2420 // 2. Inform all other peers (best-effort).
2421 for peer in self.group_service.get_all_nodes() {
2422 if peer.name == self.config.node_name
2423 || peer.name == config.target_node
2424 {
2425 continue;
2426 }
2427 if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
2428 let _ = crate::group_admin::send_transfer_master(
2429 addr,
2430 &config.target_node,
2431 new_term,
2432 );
2433 }
2434 }
2435
2436 // 3. Demote self to Replica of the new master.
2437 self.become_replica(&config.target_node)?;
2438
2439 log::info!(
2440 "Node '{}' transferred master to '{}' at term {}",
2441 self.config.node_name.as_str(),
2442 config.target_node,
2443 new_term,
2444 );
2445 Ok(())
2446 }
2447
2448 /// Register a VLSN (as master, after writing a log entry).
2449 ///
2450 /// Maps the given VLSN to the specified log file position. This is called
2451 /// by the master after it writes a replicated log entry.
2452 pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
2453 self.vlsn_index.register(vlsn, file_number, file_offset);
2454 }
2455
2456 /// Register a VLSN→LSN mapping with its `LogEntryType`, so `lastSync` /
2457 /// `lastTxnEnd` advance (JE `VLSNRange.getUpdateForNewMapping`). Used by
2458 /// the syncup driver/tests that apply VLSN-tagged entries to a real log
2459 /// and need the sync/commit boundaries to track the stream.
2460 pub fn register_vlsn_typed(
2461 &self,
2462 vlsn: u64,
2463 file_number: u32,
2464 file_offset: u32,
2465 entry_type: noxu_log::LogEntryType,
2466 ) {
2467 self.vlsn_index.register_with_type(
2468 vlsn,
2469 file_number,
2470 file_offset,
2471 entry_type,
2472 );
2473 }
2474
2475 /// Replicate a freshly committed log entry from the master.
2476 ///
2477 /// Closes finding F9 of the 2026 review.
2478 ///
2479 /// Combines `register_vlsn` with a push into the in-memory
2480 /// `peer_scanner` so that downstream replicas pulling from this
2481 /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
2482 /// stream the entry without round-tripping through the on-disk
2483 /// log. The local log is still the source of truth; the peer
2484 /// scanner is a fast-path cache that bounds itself via
2485 /// `PeerLogScanner::with_capacity` so old entries are evicted.
2486 ///
2487 /// Should be called by the master after the local commit has
2488 /// fsynced. Calling on a non-master is harmless (the peer
2489 /// scanner cache is also used by replicas) but is logged at trace
2490 /// level for diagnostics.
2491 pub fn replicate_entry(
2492 &self,
2493 vlsn: u64,
2494 file_number: u32,
2495 file_offset: u32,
2496 entry_type: u8,
2497 data: Vec<u8>,
2498 ) {
2499 // Register VLSN -> LSN, dispatching entry type so lastSync /
2500 // lastTxnEnd advance (REP-5; JE VLSNRange.getUpdateForNewMapping).
2501 // An unknown type byte falls back to extend-only registration.
2502 match noxu_log::LogEntryType::from_type_num(entry_type) {
2503 Some(et) => self.vlsn_index.register_with_type(
2504 vlsn,
2505 file_number,
2506 file_offset,
2507 et,
2508 ),
2509 None => self.vlsn_index.register(vlsn, file_number, file_offset),
2510 }
2511 // Pull path: shared peer_scanner serves replicas connecting via
2512 // PeerFeederService (catch_up_from_peer).
2513 self.peer_scanner.push(vlsn, entry_type, data.clone());
2514 // Push path (C-C2): fan out to per-replica FeederRunner queues so
2515 // that threads spawned by become_master can stream entries to each
2516 // registered replica without competing with PeerFeederService.
2517 {
2518 let queues = self.feeder_queues.read().unwrap();
2519 for queue in queues.values() {
2520 queue.push(vlsn, entry_type, data.clone());
2521 }
2522 }
2523 if !self.is_master() {
2524 log::trace!(
2525 "replicate_entry called on non-master node '{}': vlsn={}, type={}",
2526 self.config.node_name,
2527 vlsn,
2528 entry_type,
2529 );
2530 }
2531 }
2532
2533 /// Apply a replicated entry (as replica).
2534 ///
2535 /// Applies a log entry received from the master. This is called by the
2536 /// replica stream handler after receiving an entry from the feeder.
2537 ///
2538 /// `data` is the wire-encoded log-record payload. When the
2539 /// replicated environment has not been wired to a local
2540 /// `noxu_db::Environment` (i.e., before `with_environment` is
2541 /// called) the payload is forwarded into the in-memory peer
2542 /// scanner so that downstream replicas attached to the
2543 /// `PEER_FEEDER` service can re-stream it; the local log is **not**
2544 /// updated. This is documented behaviour rather than a stub — see
2545 /// the 2026 review finding #26 (medium) for the
2546 /// `with_environment`-required local-apply path.
2547 /// cleanup (rep info F35: `_data` placeholder) renames the leading
2548 /// underscore so reviewers don't read it as a TODO.
2549 pub fn apply_entry(
2550 &self,
2551 vlsn: u64,
2552 entry_type: u8,
2553 data: Vec<u8>,
2554 ) -> Result<()> {
2555 if self.is_shutdown() {
2556 return Err(RepError::StateError(
2557 "Cannot apply entry: environment is closed".to_string(),
2558 ));
2559 }
2560
2561 // Register the VLSN in the index, dispatching entry type so
2562 // lastSync/lastTxnEnd advance (REP-5; JE
2563 // VLSNRange.getUpdateForNewMapping).
2564 match noxu_log::LogEntryType::from_type_num(entry_type) {
2565 Some(et) => self.vlsn_index.register_with_type(vlsn, 0, 0, et),
2566 None => self.vlsn_index.register(vlsn, 0, 0),
2567 }
2568
2569 // Push into the peer log scanner so downstream replicas can
2570 // receive this entry via the PEER_FEEDER service.
2571 self.peer_scanner.push(vlsn, entry_type, data.clone());
2572 // C-C2 push path: fan out to per-replica FeederRunner queues.
2573 {
2574 let queues = self.feeder_queues.read().unwrap();
2575 for queue in queues.values() {
2576 queue.push(vlsn, entry_type, data.clone());
2577 }
2578 }
2579
2580 log::trace!(
2581 "Applied replicated entry: vlsn={}, type={}",
2582 vlsn,
2583 entry_type
2584 );
2585 Ok(())
2586 }
2587
2588 /// Record an ack from a replica (as master).
2589 ///
2590 /// Records that the specified replica has acknowledged processing up to
2591 /// the given VLSN. This is used by the master to track durability
2592 /// guarantees.
2593 pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
2594 // Only acks from ELECTABLE replicas count toward the durability
2595 // quorum (JE DurabilityQuorum.replicaAcksQualify: Monitors and
2596 // Secondaries do not qualify). An ack from a non-electable / unknown
2597 // node is recorded for stats elsewhere but must not satisfy the
2598 // ReplicaAckPolicy. If the node is unknown to the group view we err
2599 // toward NOT counting it.
2600 let qualifies = self
2601 .get_rep_group()
2602 .get_node(replica_name)
2603 .map(|n| n.node_type().is_electable())
2604 .unwrap_or(false);
2605 if qualifies {
2606 self.ack_tracker.record_ack(vlsn, replica_name);
2607 }
2608 // REP-9 Part 1: advance the matching `Feeder::acked_vlsn` high-water
2609 // mark (read by `update_dtvlsn_from_feeders` and exposed via
2610 // `get_acked_vlsn`). The production `FeederRunner` previously updated
2611 // only its private `known_replica_vlsn`, so the DTVLSN ranking never
2612 // saw production progress (JE `Feeder.getReplicaTxnEndVLSN`). We
2613 // record the high-water for *any* replica (electable or not); the
2614 // electable filter is reapplied when DTVLSN/quorum is computed.
2615 for feeder in self.feeders.read().iter() {
2616 if feeder.get_replica_name() == replica_name {
2617 feeder.record_ack(vlsn);
2618 break;
2619 }
2620 }
2621 // Recompute the DTVLSN from feeder progress whenever an ack lands.
2622 self.update_dtvlsn_from_feeders();
2623 // REP-9: wake any committer parked in `await_replica_acks`. Its
2624 // satisfaction predicate is the high-water feeder count, not an
2625 // exact-VLSN registration, so we must notify unconditionally (the
2626 // AckTracker's own `record_ack` only notifies when the exact VLSN was
2627 // registered, which the per-frame feeder acks generally are not).
2628 self.ack_tracker.notify_waiters();
2629 }
2630
2631 /// Returns the current Durable Transaction VLSN (D7, JE RepNode.getDTVLSN).
2632 /// The highest VLSN replicated to a majority of electable replicas; 0 if
2633 /// none yet. Used by the election ranking so the most-durable node wins.
2634 pub fn get_dtvlsn(&self) -> u64 {
2635 self.dtvlsn.load(std::sync::atomic::Ordering::Acquire)
2636 }
2637
2638 /// Advance the DTVLSN to `candidate` if it is greater (JE
2639 /// RepNode.updateDTVLSN — an `AtomicLongMax.updateMax`). The DTVLSN can
2640 /// only move forward. Returns the resulting (possibly unchanged) value.
2641 pub fn update_dtvlsn(&self, candidate: u64) -> u64 {
2642 use std::sync::atomic::Ordering;
2643 let mut cur = self.dtvlsn.load(Ordering::Acquire);
2644 while candidate > cur {
2645 match self.dtvlsn.compare_exchange_weak(
2646 cur,
2647 candidate,
2648 Ordering::AcqRel,
2649 Ordering::Acquire,
2650 ) {
2651 Ok(_) => return candidate,
2652 Err(observed) => cur = observed,
2653 }
2654 }
2655 cur
2656 }
2657
2658 /// Set the DTVLSN from the replication stream (JE RepNode.setDTVLSN —
2659 /// used exclusively by the replica, which maintains the DTVLSN from
2660 /// commit/abort records). Still enforced as advance-only via update_max so
2661 /// an out-of-order or stale record cannot move it backward.
2662 pub fn set_dtvlsn(&self, vlsn: u64) {
2663 self.update_dtvlsn(vlsn);
2664 }
2665
2666 /// Master-side DTVLSN computation (D7, JE FeederManager.updateDTVLSN):
2667 /// across the *qualifying* (electable) feeders whose replica-txn-end VLSN
2668 /// exceeds the current DTVLSN, take the minimum; once a SIMPLE_MAJORITY
2669 /// ack-count of them exceeds the current value, advance the DTVLSN to that
2670 /// minimum (a transaction is durable once a majority hold it).
2671 fn update_dtvlsn_from_feeders(&self) {
2672 if !self.is_master() {
2673 return;
2674 }
2675 let curr = self.get_dtvlsn();
2676
2677 // SIMPLE_MAJORITY required-ack-count over the electable group,
2678 // computed the same way as await_replica_acks.
2679 let group = self.get_rep_group();
2680 let electable_peers: u32 = group
2681 .get_nodes()
2682 .iter()
2683 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2684 .count() as u32;
2685 let electable_count = electable_peers + 1; // +1 for self/master
2686 // required electable acks for SIMPLE_MAJORITY = floor(n/2) replicas
2687 // (the master self-acks; a majority is reached when this many peers
2688 // also hold the VLSN).
2689 let durable_ack_count = electable_count / 2;
2690 if durable_ack_count == 0 {
2691 // Single-node (or majority is self alone): the master's own log is
2692 // immediately durable up to its latest VLSN.
2693 self.update_dtvlsn(self.get_current_vlsn());
2694 return;
2695 }
2696
2697 let mut min = u64::MAX;
2698 let mut ack_count: u32 = 0;
2699 for feeder in self.feeders.read().iter() {
2700 // replicaAcksQualify: only electable feeders count (D6).
2701 let qualifies = group
2702 .get_node(&feeder.get_replica_name())
2703 .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2704 .unwrap_or(false);
2705 if !qualifies {
2706 continue;
2707 }
2708 let replica_vlsn = feeder.get_acked_vlsn();
2709 if replica_vlsn <= curr {
2710 continue;
2711 }
2712 if replica_vlsn < min {
2713 min = replica_vlsn;
2714 }
2715 ack_count += 1;
2716 if ack_count >= durable_ack_count {
2717 // A majority of electable replicas hold >= min: durable.
2718 self.update_dtvlsn(min);
2719 return;
2720 }
2721 }
2722 // DTVLSN unchanged.
2723 }
2724
2725 /// REP-9: count qualifying (electable) feeders whose acked high-water VLSN
2726 /// is `>= commit_vlsn`. This is the Rust equivalent of JE
2727 /// `FeederManager.getNumCurrentAckFeeders(commitVLSN)` — the durability
2728 /// quorum is satisfied when this count reaches the required ack count.
2729 /// Only Electable replicas qualify (D6, JE
2730 /// `DurabilityQuorum.replicaAcksQualify`).
2731 fn count_ack_feeders_ge(&self, commit_vlsn: u64) -> u32 {
2732 let group = self.get_rep_group();
2733 let mut count = 0u32;
2734 for feeder in self.feeders.read().iter() {
2735 let qualifies = group
2736 .get_node(&feeder.get_replica_name())
2737 .map(|n| n.node_type == crate::node_type::NodeType::Electable)
2738 .unwrap_or(false);
2739 // A feeder counts only if it has acked a *real* VLSN at or above
2740 // the commit VLSN. `acked_vlsn == 0` is the NULL sentinel (no ack
2741 // yet) and must never satisfy a commit, even when `commit_vlsn`
2742 // itself is 0 (no replicated commit logged) — mirrors JE
2743 // `getReplicaTxnEndVLSN()` returning NULL_VLSN for a fresh feeder,
2744 // which is not `>=` any commit VLSN.
2745 let acked = feeder.get_acked_vlsn();
2746 if qualifies && acked > 0 && acked >= commit_vlsn {
2747 count += 1;
2748 }
2749 }
2750 count
2751 }
2752
2753 /// Set the state change listener.
2754 ///
2755 ///
2756 ///
2757 /// Sets the listener used to receive asynchronous replication node state
2758 /// change events. Note that there is one listener per replication node,
2759 /// not one per handle. Invoking this method adds to the set of listeners.
2760 ///
2761 /// Invoking this method typically results in an immediate callback to the
2762 /// application via the `on_state_change` method, so that the application
2763 /// is made aware of the existing state of the node at the time the listener
2764 /// is first established.
2765 pub fn set_state_change_listener(
2766 &self,
2767 listener: Arc<dyn StateChangeListener>,
2768 ) {
2769 // Immediately notify the listener of the current state
2770 let current_state = self.node_state.get_state();
2771 let event = StateChangeEvent::new(
2772 current_state,
2773 current_state,
2774 self.get_master_name(),
2775 );
2776 listener.on_state_change(event);
2777
2778 let mut listeners = self.listeners.write();
2779 listeners.push(listener);
2780 }
2781
2782 /// Close the replicated environment.
2783 ///
2784 ///
2785 ///
2786 /// Closes this handle and releases any resources. When closed, daemon
2787 /// threads are stopped, even if they are performing work. The node ceases
2788 /// participation in the replication group. If the node was currently the
2789 /// master, the rest of the group will hold an election.
2790 ///
2791 /// The ReplicatedEnvironment should not be closed while any other type of
2792 /// handle that refers to it is not yet closed.
2793 pub fn close(&self) -> Result<()> {
2794 if self.shutdown.swap(true, Ordering::SeqCst) {
2795 // Already closed
2796 return Ok(());
2797 }
2798
2799 let old_state = self.node_state.get_state();
2800
2801 // Transition to Shutdown state. The state machine allows this from
2802 // any non-Shutdown state.
2803 let _ = self.node_state.transition_to(NodeState::Shutdown);
2804
2805 // Notify listeners of the shutdown
2806 self.notify_listeners(old_state, NodeState::Shutdown);
2807
2808 // Clear feeders
2809 {
2810 let mut feeders = self.feeders.write();
2811 feeders.clear();
2812 }
2813
2814 // C-C2: close all registered feeder channels so FeederRunner threads
2815 // observe ChannelClosed and exit their run() loops cleanly.
2816 {
2817 let channels = self.feeder_channels.lock().unwrap();
2818 for (name, ch) in channels.iter() {
2819 if let Err(e) = ch.close() {
2820 log::debug!(
2821 "close: feeder channel for '{}' already closed: {}",
2822 name,
2823 e
2824 );
2825 }
2826 }
2827 }
2828 // Drop all active runners and queues so their Arcs release.
2829 self.active_feeder_runners.lock().unwrap().clear();
2830 self.feeder_queues.write().unwrap().clear();
2831
2832 // Signal and join all I/O threads spawned by become_master /
2833 // become_replica / start_vlsn_persistence_daemon. The vlsn-flush
2834 // thread does a final flush on its way out so a clean close is
2835 // recoverable. Closes finding F11.
2836 self.io_shutdown.store(true, Ordering::SeqCst);
2837 {
2838 let mut threads = self.io_threads.lock().unwrap();
2839 for handle in threads.drain(..) {
2840 let _ = handle.join();
2841 }
2842 }
2843
2844 // Belt-and-braces: even when no daemon is running (e.g.
2845 // `ReplicatedEnvironment::new` without `open`), persist a final
2846 // snapshot if env_home is configured.
2847 if let Some(ref home) = self.config.env_home
2848 && let Err(e) =
2849 crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
2850 {
2851 log::warn!(
2852 "close: failed to persist VLSN index to {}: {}",
2853 home.display(),
2854 e
2855 );
2856 }
2857
2858 // Stop the service dispatcher (the: serviceDispatcher.shutdown()).
2859 if let Some(ref dispatcher) = self.tcp_dispatcher {
2860 dispatcher.stop();
2861 let kind = if dispatcher.is_tls() { "TLS" } else { "TCP" };
2862 log::debug!(
2863 "Node '{}' {} service dispatcher stopped",
2864 self.config.node_name.as_str(),
2865 kind,
2866 );
2867 }
2868
2869 log::info!(
2870 "Replicated environment '{}' in group '{}' closed",
2871 self.config.node_name.as_str(),
2872 self.config.group_name.as_str()
2873 );
2874
2875 Ok(())
2876 }
2877
2878 /// Close this handle and shut down the Replication Group by forcing all
2879 /// active Replicas to exit.
2880 ///
2881 ///
2882 ///
2883 /// This method must be invoked on the node that's currently the Master
2884 /// after all other outstanding handles have been closed.
2885 ///
2886 /// When push-feeder threads are active (registered via
2887 /// [`Self::register_feeder_channel`]), the master first waits up to half
2888 /// of `replica_shutdown_timeout_ms` for each FeederRunner replica to
2889 /// acknowledge all outstanding log entries (VLSN catch-up). Replicas
2890 /// that do not catch up within the budget receive a warning; the master
2891 /// proceeds to send `SHUTDOWN_GROUP` regardless. This closes finding M-4
2892 /// of the v3.x production-readiness review.
2893 ///
2894 /// Replicas that are not fed via a registered channel (pull-based
2895 /// `PeerFeederService` path) are sent `SHUTDOWN_GROUP` without a
2896 /// VLSN-level catch-up wait — that wait requires per-replica ack tracking
2897 /// which the pull path does not yet provide.
2898 pub fn shutdown_group(
2899 &self,
2900 replica_shutdown_timeout_ms: u64,
2901 ) -> Result<()> {
2902 if !self.is_master() {
2903 return Err(RepError::InvalidState(
2904 "shutdownGroup must be invoked on the master".to_string(),
2905 ));
2906 }
2907
2908 log::info!(
2909 "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
2910 self.config.node_name.as_str(),
2911 self.config.group_name.as_str(),
2912 replica_shutdown_timeout_ms,
2913 );
2914
2915 // M-4: Wait for active FeederRunner replicas to ack the master's
2916 // current VLSN before sending SHUTDOWN_GROUP. We allow up to half
2917 // the overall timeout for the catch-up phase so the second half
2918 // remains for the SHUTDOWN_GROUP send loop.
2919 let catchup_budget_ms = replica_shutdown_timeout_ms / 2;
2920 if catchup_budget_ms > 0 {
2921 let master_vlsn = self.vlsn_index.get_range().last();
2922 if master_vlsn > 0 {
2923 let runners: Vec<(String, Arc<FeederRunner>)> = self
2924 .active_feeder_runners
2925 .lock()
2926 .unwrap()
2927 .iter()
2928 .map(|(k, v)| (k.clone(), Arc::clone(v)))
2929 .collect();
2930 if !runners.is_empty() {
2931 let catchup_deadline = std::time::Instant::now()
2932 + Duration::from_millis(catchup_budget_ms);
2933 for (name, runner) in &runners {
2934 loop {
2935 let acked = runner.known_replica_vlsn();
2936 if acked >= master_vlsn
2937 || std::time::Instant::now() >= catchup_deadline
2938 {
2939 if acked < master_vlsn {
2940 log::warn!(
2941 "shutdown_group: replica '{}' acked \
2942 VLSN {} < master VLSN {}; proceeding",
2943 name,
2944 acked,
2945 master_vlsn,
2946 );
2947 } else {
2948 log::info!(
2949 "shutdown_group: replica '{}' caught up \
2950 to VLSN {}",
2951 name,
2952 acked,
2953 );
2954 }
2955 break;
2956 }
2957 std::thread::sleep(Duration::from_millis(10));
2958 }
2959 }
2960 }
2961 }
2962 }
2963
2964 // Closes finding F8 of the 2026 review.
2965 //
2966 // Send SHUTDOWN_GROUP to every known peer. The recipient calls
2967 // its own `close()` and the per-connection ADMIN handler
2968 // returns ACK_OK. Any peer that doesn't ack within the
2969 // timeout is logged and the master proceeds. After signalling
2970 // every peer, the master closes its own env.
2971 let deadline = std::time::Instant::now()
2972 + Duration::from_millis(replica_shutdown_timeout_ms);
2973
2974 for peer in self.group_service.get_all_nodes() {
2975 if peer.name == self.config.node_name {
2976 continue;
2977 }
2978 // Don't exceed the deadline waiting for any single peer.
2979 let now = std::time::Instant::now();
2980 if now >= deadline {
2981 log::warn!(
2982 "shutdown_group: deadline reached; skipping remaining peers"
2983 );
2984 break;
2985 }
2986 let addr_str = format!("{}:{}", peer.host, peer.port);
2987 let addr = match addr_str.parse::<SocketAddr>() {
2988 Ok(a) => a,
2989 Err(e) => {
2990 log::warn!(
2991 "shutdown_group: peer '{}' has bad address {}: {}",
2992 peer.name,
2993 addr_str,
2994 e
2995 );
2996 continue;
2997 }
2998 };
2999 match crate::group_admin::send_shutdown_group(addr) {
3000 Ok(true) => log::info!(
3001 "shutdown_group: peer '{}' acknowledged",
3002 peer.name
3003 ),
3004 Ok(false) => log::warn!(
3005 "shutdown_group: peer '{}' rejected the request",
3006 peer.name
3007 ),
3008 Err(e) => log::warn!(
3009 "shutdown_group: peer '{}' unreachable: {}",
3010 peer.name,
3011 e
3012 ),
3013 }
3014 }
3015
3016 // Master closes itself last.
3017 self.close()
3018 }
3019
3020 /// Check if shutdown is in progress.
3021 pub fn is_shutdown(&self) -> bool {
3022 self.shutdown.load(Ordering::SeqCst)
3023 }
3024
3025 /// Notify all registered listeners of a state change.
3026 fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
3027 let listeners = self.listeners.read();
3028 if !listeners.is_empty() {
3029 let event = StateChangeEvent::new(
3030 old_state,
3031 new_state,
3032 self.get_master_name(),
3033 );
3034 for listener in listeners.iter() {
3035 listener.on_state_change(event.clone());
3036 }
3037 }
3038 }
3039}
3040
3041// ---------------------------------------------------------------------------
3042// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
3043// ---------------------------------------------------------------------------
3044//
3045// `noxu_db::Transaction::commit_with_durability` calls
3046// `await_replica_acks` after the local WAL fsync. This impl:
3047//
3048// 1. Rejects calls on a non-master node with `NotMaster`.
3049// 2. Rejects calls during shutdown with `Shutdown`.
3050// 3. Computes the required ack count from `electable_count` and the
3051// requested policy.
3052// 4. Allocates a unique commit sequence number, registers the ack
3053// requirement on the `AckTracker`, and polls `is_satisfied` with
3054// a small sleep until either the timeout elapses or the policy
3055// is satisfied.
3056// 5. Cleans up the tracker entry on every exit path.
3057//
3058// Closes finding F1 of the 2026 review.
3059impl ReplicaAckCoordinator for ReplicatedEnvironment {
3060 fn await_replica_acks(
3061 &self,
3062 policy: ReplicaAckPolicyKind,
3063 timeout: Duration,
3064 ) -> std::result::Result<u32, AckWaitError> {
3065 // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
3066 // says callers may already short-circuit, but be defensive.
3067 if matches!(policy, ReplicaAckPolicyKind::None) {
3068 return Ok(0);
3069 }
3070
3071 if self.is_shutdown() {
3072 return Err(AckWaitError {
3073 kind: AckWaitErrorKind::Shutdown,
3074 needed: 0,
3075 received: 0,
3076 });
3077 }
3078
3079 if !self.is_master() {
3080 return Err(AckWaitError {
3081 kind: AckWaitErrorKind::NotMaster,
3082 needed: 0,
3083 received: 0,
3084 });
3085 }
3086
3087 // Count electable peers (excluding the master) using the
3088 // RepGroup view, which counts Arbiters and Electables
3089 // identically. Only Electable nodes are counted as data
3090 // replicas able to ack a commit. The master itself is
3091 // *implicit*: it is not registered in `group_service` (only
3092 // peers are), so we add 1 to obtain the total electable
3093 // count expected by `ReplicaAckPolicyKind::required_acks`.
3094 let group = self.get_rep_group();
3095 let electable_peers: u32 = group
3096 .get_nodes()
3097 .iter()
3098 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
3099 .count() as u32;
3100 let electable_count: u32 = electable_peers + 1; // +1 for self/master
3101
3102 let needed = policy.required_acks(electable_count);
3103 if needed == 0 {
3104 // Single-node group, or All with only the master itself.
3105 return Ok(0);
3106 }
3107
3108 // REP-9 Part 2: the commit's VLSN is the key. The master assigns a
3109 // VLSN when it logs the TxnCommit (via the shared `wal_vlsn_counter`
3110 // bumped in `EnvironmentImpl::log_txn_commit`), immediately before
3111 // this gate runs. The latest assigned VLSN therefore IS this
3112 // commit's VLSN (the trait contract: "implementations are responsible
3113 // for assigning the commit VLSN internally"). We wait until a quorum
3114 // of qualifying electable replicas have acked a VLSN >= the commit
3115 // VLSN — faithful to JE `FeederManager.getNumCurrentAckFeeders`, which
3116 // counts feeders whose `getReplicaTxnEndVLSN() >= commitVLSN` (a
3117 // high-water `>=` test, NOT an exact-VLSN match).
3118 //
3119 // ponytail: reads the global high-water VLSN, so a concurrent later
3120 // commit can make this gate wait on a slightly higher VLSN than its
3121 // own. That is strictly SAFE (waiting for >= a newer VLSN never
3122 // returns early) and only marginally less precise; thread the
3123 // per-txn VLSN through the trait if exact per-commit granularity is
3124 // ever needed.
3125 let commit_vlsn = self.wal_vlsn_counter.load(Ordering::Acquire);
3126
3127 // Register on the AckTracker too: this is what `record_ack` notifies,
3128 // so the condvar wakes us as acks land. The satisfaction decision
3129 // itself is the high-water feeder count below.
3130 self.ack_tracker.register(commit_vlsn, needed);
3131
3132 // Block on the ack condvar until a quorum of electable feeders hold
3133 // the commit VLSN, the timeout elapses, or shutdown is signalled — no
3134 // spin-poll (JE FeederTxns.TxnInfo uses a per-transaction
3135 // CountDownLatch.await; the AckTracker condvar is the shared-mutex
3136 // equivalent). record_ack notifies us as acks arrive.
3137 let satisfied = self.ack_tracker.wait_for_predicate(
3138 timeout,
3139 || self.count_ack_feeders_ge(commit_vlsn) >= needed,
3140 || self.is_shutdown(),
3141 );
3142 if satisfied {
3143 self.ack_tracker.cleanup_through(commit_vlsn);
3144 return Ok(needed);
3145 }
3146 if self.is_shutdown() {
3147 self.ack_tracker.cleanup_through(commit_vlsn);
3148 return Err(AckWaitError {
3149 kind: AckWaitErrorKind::Shutdown,
3150 needed,
3151 received: 0,
3152 });
3153 }
3154 // Timed out: report the partial ack count (qualifying electable
3155 // feeders holding the commit VLSN) so the caller can surface
3156 // InsufficientReplicas.
3157 let received = self.count_ack_feeders_ge(commit_vlsn);
3158 self.ack_tracker.cleanup_through(commit_vlsn);
3159 Err(AckWaitError { kind: AckWaitErrorKind::Timeout, needed, received })
3160 }
3161
3162 /// X-3: allocate the next VLSN for a recovered XA commit and register
3163 /// `lsn` in the VLSN index so feeders can stream the commit.
3164 ///
3165 /// Increments off the current latest VLSN so the new VLSN is strictly
3166 /// monotonically increasing. In a single-node or master-less environment
3167 /// (not master) returns 0 (NULL_VLSN — harmless, the default).
3168 fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
3169 // Only allocate a VLSN when we are the master; on a replica the
3170 // recovered XA should have been replicated by the original master.
3171 if !self.is_master() {
3172 return 0;
3173 }
3174 let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
3175 // A recovered XA commit is a commit log entry; dispatch as TxnCommit
3176 // so lastTxnEnd/lastSync advance (REP-5).
3177 self.vlsn_index.register_with_type(
3178 next_vlsn,
3179 lsn.file_number(),
3180 lsn.file_offset(),
3181 noxu_log::LogEntryType::TxnCommit,
3182 );
3183 log::debug!(
3184 "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
3185 next_vlsn,
3186 lsn
3187 );
3188 next_vlsn
3189 }
3190
3191 /// R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.
3192 ///
3193 /// The caller writes the `TxnCommit` WAL entry with this VLSN embedded,
3194 /// then calls `register_recovered_commit_vlsn` with the actual commit LSN.
3195 /// This two-step approach ensures the WAL entry carries the VLSN so the
3196 /// X-14 VLSN rebuild on second crash can find it.
3197 fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
3198 if !self.is_master() {
3199 return 0;
3200 }
3201 // Peek at the next VLSN without registering. The actual registration
3202 // happens in register_recovered_commit_vlsn() after the WAL write.
3203 self.vlsn_index.get_latest_vlsn() + 1
3204 }
3205
3206 /// R-3: register a pre-allocated VLSN in the VLSN index with the actual
3207 /// commit LSN. Called after writing the `TxnCommit` WAL entry.
3208 fn register_recovered_commit_vlsn(
3209 &self,
3210 vlsn: u64,
3211 commit_lsn: noxu_util::Lsn,
3212 ) {
3213 if vlsn == 0 || !self.is_master() {
3214 return;
3215 }
3216 // The pre-allocated VLSN is for a TxnCommit WAL entry; dispatch the
3217 // type so lastTxnEnd/lastSync advance (REP-5).
3218 self.vlsn_index.register_with_type(
3219 vlsn,
3220 commit_lsn.file_number(),
3221 commit_lsn.file_offset(),
3222 noxu_log::LogEntryType::TxnCommit,
3223 );
3224 log::debug!(
3225 "register_recovered_commit_vlsn: registered vlsn={} for commit_lsn={:?}",
3226 vlsn,
3227 commit_lsn
3228 );
3229 }
3230}
3231
3232#[cfg(test)]
3233mod tests {
3234 use super::*;
3235 use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
3236
3237 /// Helper to create a test config with a fixed port (unit-test style,
3238 /// no real TCP bind needed — hostname "localhost" resolves but the port
3239 /// might be in use; use `test_config_port0` for real TCP tests).
3240 fn test_config(node_name: &str) -> RepConfig {
3241 RepConfig::builder("test_group", node_name, "localhost")
3242 .node_port(5001)
3243 .build()
3244 }
3245
3246 /// Helper to create a test config that binds to an OS-assigned port.
3247 fn test_config_port0(node_name: &str) -> RepConfig {
3248 RepConfig::builder("test_group", node_name, "127.0.0.1")
3249 .node_port(0)
3250 .build()
3251 }
3252
3253 #[test]
3254 fn test_initial_state_is_detached() {
3255 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3256 // NodeStateMachine starts in Detached state
3257 assert_eq!(env.get_state(), NodeState::Detached);
3258 assert!(!env.is_master());
3259 assert!(!env.is_replica());
3260 assert!(!env.is_active());
3261 }
3262
3263 #[test]
3264 fn test_become_master() {
3265 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3266 env.become_master(1).unwrap();
3267 assert_eq!(env.get_state(), NodeState::Master);
3268 assert!(env.is_master());
3269 assert!(!env.is_replica());
3270 assert!(env.is_active());
3271 }
3272
3273 #[test]
3274 fn test_become_replica() {
3275 let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
3276 env.become_replica("node1").unwrap();
3277 assert_eq!(env.get_state(), NodeState::Replica);
3278 assert!(!env.is_master());
3279 assert!(env.is_replica());
3280 assert!(env.is_active());
3281 }
3282
3283 #[test]
3284 fn test_get_node_name() {
3285 let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
3286 assert_eq!(env.get_node_name(), "my_node");
3287 }
3288
3289 #[test]
3290 fn test_get_group_name() {
3291 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3292 assert_eq!(env.get_group_name(), "test_group");
3293 }
3294
3295 #[test]
3296 fn test_register_vlsn_updates_index() {
3297 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3298 env.register_vlsn(1, 0, 100);
3299 env.register_vlsn(2, 0, 200);
3300 env.register_vlsn(3, 0, 300);
3301
3302 assert_eq!(env.get_current_vlsn(), 3);
3303 let range = env.get_vlsn_range();
3304 assert_eq!(range.first(), 1);
3305 assert_eq!(range.last(), 3);
3306 }
3307
3308 #[test]
3309 fn test_record_ack() {
3310 use crate::node_type::NodeType;
3311 use crate::rep_node::RepNode;
3312 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3313 env.become_master(1).unwrap();
3314 // replicaAcksQualify: only ELECTABLE replicas count toward durability,
3315 // so the replica must be a known electable member of the group.
3316 env.add_peer(RepNode::new(
3317 "replica1".to_string(),
3318 NodeType::Electable,
3319 "127.0.0.1".to_string(),
3320 6001,
3321 2,
3322 ))
3323 .unwrap();
3324
3325 env.register_vlsn(1, 0, 100);
3326 // Register a pending ack requirement, then record ack
3327 env.get_ack_tracker().register(1, 1);
3328 env.record_ack(1, "replica1");
3329 // Ack should be satisfied
3330 assert!(env.get_ack_tracker().is_satisfied(1));
3331 }
3332
3333 #[test]
3334 fn test_record_ack_from_non_electable_does_not_qualify() {
3335 use crate::node_type::NodeType;
3336 use crate::rep_node::RepNode;
3337 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3338 env.become_master(1).unwrap();
3339 // A Monitor is NOT electable -> its ack must not count (JE
3340 // DurabilityQuorum.replicaAcksQualify).
3341 env.add_peer(RepNode::new(
3342 "monitor1".to_string(),
3343 NodeType::Monitor,
3344 "127.0.0.1".to_string(),
3345 6002,
3346 3,
3347 ))
3348 .unwrap();
3349 env.register_vlsn(1, 0, 100);
3350 env.get_ack_tracker().register(1, 1);
3351 env.record_ack(1, "monitor1");
3352 assert!(
3353 !env.get_ack_tracker().is_satisfied(1),
3354 "non-electable ack must not satisfy durability quorum"
3355 );
3356 // An unknown replica likewise does not qualify.
3357 env.record_ack(1, "ghost");
3358 assert!(!env.get_ack_tracker().is_satisfied(1));
3359 }
3360
3361 #[test]
3362 fn test_authoritative_quorum_met() {
3363 // 1-node group (electable_total=1): master alone IS authoritative
3364 // (quorum_size = 1/2+1 = 1; 0 replicas + 1 >= 1).
3365 assert!(ReplicatedEnvironment::authoritative_quorum_met(0, 1));
3366 // 3-node group (electable_total=3, quorum_size=2): master with 0
3367 // connected replicas is the minority -> NOT authoritative.
3368 assert!(!ReplicatedEnvironment::authoritative_quorum_met(0, 3));
3369 // 3-node group with 1 connected electable replica -> 1+1=2 >= 2 -> yes.
3370 assert!(ReplicatedEnvironment::authoritative_quorum_met(1, 3));
3371 // 5-node group (quorum_size=3): need 2 connected replicas.
3372 assert!(!ReplicatedEnvironment::authoritative_quorum_met(1, 5));
3373 assert!(ReplicatedEnvironment::authoritative_quorum_met(2, 5));
3374 }
3375
3376 #[test]
3377 fn test_is_authoritative_master_requires_master_role() {
3378 // A non-master is never authoritative regardless of connections.
3379 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3380 assert!(!env.is_master());
3381 assert!(!env.is_authoritative_master());
3382 // A single-node master (no peers) IS authoritative.
3383 env.become_master(1).unwrap();
3384 assert!(env.is_authoritative_master());
3385 }
3386
3387 #[test]
3388 fn test_dtvlsn_update_max_advances_only() {
3389 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3390 assert_eq!(env.get_dtvlsn(), 0);
3391 assert_eq!(env.update_dtvlsn(10), 10);
3392 assert_eq!(env.get_dtvlsn(), 10);
3393 // A lower candidate must not move it backward.
3394 assert_eq!(env.update_dtvlsn(5), 10);
3395 assert_eq!(env.get_dtvlsn(), 10);
3396 // Equal is a no-op.
3397 assert_eq!(env.update_dtvlsn(10), 10);
3398 // set_dtvlsn (replica path) is also advance-only.
3399 env.set_dtvlsn(7);
3400 assert_eq!(env.get_dtvlsn(), 10);
3401 env.set_dtvlsn(20);
3402 assert_eq!(env.get_dtvlsn(), 20);
3403 }
3404
3405 #[test]
3406 fn test_dtvlsn_majority_min_across_feeders() {
3407 use crate::node_type::NodeType;
3408 use crate::rep_node::RepNode;
3409 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
3410 env.become_master(1).unwrap();
3411 // Three electable replicas → electable_count = 4 (incl. master) →
3412 // durable_ack_count = 2. With master self-ack, DTVLSN advances to the
3413 // min of the 2 highest qualifying feeders that exceed the current
3414 // DTVLSN.
3415 for (i, name) in ["r1", "r2", "r3"].iter().enumerate() {
3416 env.add_peer(RepNode::new(
3417 name.to_string(),
3418 NodeType::Electable,
3419 "127.0.0.1".to_string(),
3420 6100 + i as u16,
3421 (i + 2) as u32,
3422 ))
3423 .unwrap();
3424 }
3425 // Register feeders with differing acked VLSNs: r1=100, r2=80, r3=50.
3426 for (name, vlsn) in [("r1", 100u64), ("r2", 80), ("r3", 50)] {
3427 let f = crate::stream::feeder::Feeder::new(name.to_string());
3428 f.record_ack(vlsn);
3429 env.feeders.write().push(f);
3430 }
3431 env.update_dtvlsn_from_feeders();
3432 // First two qualifying feeders encountered are r1(100), r2(80);
3433 // min(100,80)=80 and that is a majority (2 of 4) → DTVLSN = 80.
3434 // (r3=50 < 80 is not required for durability.)
3435 assert!(
3436 env.get_dtvlsn() >= 80,
3437 "DTVLSN must reach the majority-min (>=80), got {}",
3438 env.get_dtvlsn()
3439 );
3440 }
3441
3442 #[test]
3443 fn test_close_sets_shutdown() {
3444 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3445 assert!(!env.is_shutdown());
3446
3447 env.close().unwrap();
3448 assert!(env.is_shutdown());
3449 // After close, state should be Shutdown
3450 assert_eq!(env.get_state(), NodeState::Shutdown);
3451 }
3452
3453 #[test]
3454 fn test_close_is_idempotent() {
3455 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3456 env.close().unwrap();
3457 env.close().unwrap(); // Should not error
3458 assert!(env.is_shutdown());
3459 }
3460
3461 #[test]
3462 fn test_cannot_become_master_when_shutdown() {
3463 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3464 env.close().unwrap();
3465
3466 let result = env.become_master(1);
3467 assert!(result.is_err());
3468 }
3469
3470 #[test]
3471 fn test_cannot_become_replica_when_shutdown() {
3472 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3473 env.close().unwrap();
3474
3475 let result = env.become_replica("master");
3476 assert!(result.is_err());
3477 }
3478
3479 #[test]
3480 fn test_cannot_apply_entry_when_shutdown() {
3481 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3482 env.close().unwrap();
3483
3484 let result = env.apply_entry(1, 0, vec![1, 2, 3]);
3485 assert!(result.is_err());
3486 }
3487
3488 #[test]
3489 fn test_cannot_transfer_master_when_not_master() {
3490 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3491 env.become_replica("other").unwrap();
3492
3493 let config = MasterTransferConfig::new(
3494 "target_node".to_string(),
3495 Duration::from_secs(30),
3496 );
3497 let result = env.transfer_master(config);
3498 assert!(result.is_err());
3499 }
3500
3501 #[test]
3502 fn test_transfer_master_requires_registered_target() {
3503 // F7: transfer_master is no longer a no-op; it sends an ADMIN
3504 // TRANSFER_MASTER signal to the target via TCP. An unregistered
3505 // target is rejected at the address-resolution step.
3506 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3507 env.become_master(1).unwrap();
3508
3509 let config = MasterTransferConfig::new(
3510 "unknown_target".to_string(),
3511 Duration::from_secs(30),
3512 );
3513 let result = env.transfer_master(config);
3514 assert!(
3515 result.is_err(),
3516 "transfer_master to unregistered target must error"
3517 );
3518 }
3519
3520 #[test]
3521 fn test_apply_entry_registers_vlsn() {
3522 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3523 env.become_replica("master").unwrap();
3524
3525 env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
3526 env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
3527
3528 assert_eq!(env.get_current_vlsn(), 2);
3529 }
3530
3531 #[test]
3532 fn test_master_name_tracking() {
3533 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3534
3535 // Initially no master known
3536 assert!(env.get_master_name().is_none());
3537
3538 // After becoming master, this node is the master
3539 env.become_master(1).unwrap();
3540 assert_eq!(env.get_master_name(), Some("node1".to_string()));
3541 }
3542
3543 #[test]
3544 fn test_master_to_replica_transition() {
3545 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3546
3547 // Become master first
3548 env.become_master(1).unwrap();
3549 assert_eq!(env.get_master_name(), Some("node1".to_string()));
3550
3551 // Transition to replica (Master -> Replica is valid)
3552 env.become_replica("other_master").unwrap();
3553 assert_eq!(env.get_master_name(), Some("other_master".to_string()));
3554 assert!(env.is_replica());
3555 }
3556
3557 #[test]
3558 fn test_state_change_listener_notification() {
3559 struct TestListener {
3560 call_count: AtomicU32,
3561 last_new_state: noxu_sync::Mutex<Option<NodeState>>,
3562 }
3563
3564 impl StateChangeListener for TestListener {
3565 fn on_state_change(&self, event: StateChangeEvent) {
3566 self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
3567 *self.last_new_state.lock() = Some(event.new_state);
3568 }
3569 }
3570
3571 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3572 let listener = Arc::new(TestListener {
3573 call_count: AtomicU32::new(0),
3574 last_new_state: noxu_sync::Mutex::new(None),
3575 });
3576
3577 // Setting the listener should trigger an immediate notification
3578 env.set_state_change_listener(listener.clone());
3579 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
3580
3581 // State change should trigger another notification
3582 env.become_master(1).unwrap();
3583 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
3584 assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
3585 }
3586
3587 #[test]
3588 fn test_close_notifies_listeners() {
3589 struct ShutdownListener {
3590 shutdown_seen: AtomicBool,
3591 }
3592
3593 impl StateChangeListener for ShutdownListener {
3594 fn on_state_change(&self, event: StateChangeEvent) {
3595 if event.new_state == NodeState::Shutdown {
3596 self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
3597 }
3598 }
3599 }
3600
3601 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3602 let listener = Arc::new(ShutdownListener {
3603 shutdown_seen: AtomicBool::new(false),
3604 });
3605
3606 // The initial notification is for the current (Detached) state
3607 env.set_state_change_listener(listener.clone());
3608
3609 // Become master first so the close transition is meaningful
3610 env.become_master(1).unwrap();
3611 assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3612
3613 env.close().unwrap();
3614 assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
3615 }
3616
3617 #[test]
3618 fn test_shutdown_group_requires_master() {
3619 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3620 env.become_replica("other").unwrap();
3621
3622 let result = env.shutdown_group(5000);
3623 assert!(result.is_err());
3624 }
3625
3626 #[test]
3627 fn test_shutdown_group_as_master() {
3628 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3629 env.become_master(1).unwrap();
3630
3631 let result = env.shutdown_group(5000);
3632 assert!(result.is_ok());
3633 assert!(env.is_shutdown());
3634 }
3635
3636 #[test]
3637 fn test_get_config() {
3638 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3639 assert_eq!(env.get_config().node_name, "node1");
3640 assert_eq!(env.get_config().group_name, "test_group");
3641 }
3642
3643 #[test]
3644 fn test_get_stats() {
3645 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3646 let _stats = env.get_stats();
3647 // Just verify we can access stats without panicking
3648 }
3649
3650 // -----------------------------------------------------------------------
3651 // TCP dispatcher tests (H-5 / H-7)
3652 // -----------------------------------------------------------------------
3653
3654 #[test]
3655 fn test_tcp_dispatcher_starts_on_new() {
3656 // Use port 0 so the OS assigns an ephemeral port.
3657 let env =
3658 ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
3659 // The dispatcher must have started and bound a real port.
3660 let addr = env.bound_addr();
3661 assert!(addr.is_some(), "expected a bound address");
3662 let addr = addr.unwrap();
3663 assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
3664 }
3665
3666 #[test]
3667 fn test_tcp_dispatcher_stops_on_close() {
3668 let env =
3669 ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
3670 // Dispatcher is running.
3671 assert!(
3672 env.tcp_dispatcher
3673 .as_ref()
3674 .map(|d| d.is_running())
3675 .unwrap_or(false)
3676 );
3677
3678 env.close().unwrap();
3679
3680 // After close, dispatcher must be stopped.
3681 assert!(
3682 !env.tcp_dispatcher
3683 .as_ref()
3684 .map(|d| d.is_running())
3685 .unwrap_or(false),
3686 "dispatcher should be stopped after close"
3687 );
3688 }
3689
3690 #[test]
3691 fn test_tcp_dispatcher_accepts_connection() {
3692 use crate::net::Channel;
3693 use crate::net::ServiceHandler;
3694 use crate::net::service_dispatcher::connect_to_service;
3695 use std::sync::atomic::{AtomicU32, Ordering as AO};
3696 use std::time::Duration;
3697
3698 struct PingHandler {
3699 count: AtomicU32,
3700 }
3701 impl ServiceHandler for PingHandler {
3702 fn service_name(&self) -> &str {
3703 "ping"
3704 }
3705 fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
3706 self.count.fetch_add(1, AO::SeqCst);
3707 // Echo the first message back.
3708 if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
3709 let _ = ch.send(&msg);
3710 }
3711 Ok(())
3712 }
3713 }
3714
3715 let env =
3716 ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
3717 let addr = env.bound_addr().expect("dispatcher must be bound");
3718
3719 // Register a ping handler on the running dispatcher.
3720 if let Some(ref disp) = env.tcp_dispatcher {
3721 let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
3722 disp.register("ping", handler.clone());
3723
3724 // Give the accept thread a moment.
3725 std::thread::sleep(Duration::from_millis(20));
3726
3727 let client = connect_to_service(addr, "ping").unwrap();
3728 client.send(b"hello").unwrap();
3729 let reply = client.receive(Duration::from_secs(2)).unwrap();
3730 assert_eq!(reply, Some(b"hello".to_vec()));
3731
3732 assert_eq!(handler.count.load(AO::SeqCst), 1);
3733 }
3734
3735 env.close().unwrap();
3736 }
3737
3738 #[test]
3739 fn test_become_master_auto_transitions_from_detached() {
3740 // The state machine requires Detached -> Unknown -> Master.
3741 // become_master() should handle this automatically.
3742 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3743 assert_eq!(env.get_state(), NodeState::Detached);
3744 env.become_master(1).unwrap();
3745 assert_eq!(env.get_state(), NodeState::Master);
3746 }
3747
3748 #[test]
3749 fn test_become_replica_auto_transitions_from_detached() {
3750 // The state machine requires Detached -> Unknown -> Replica.
3751 // become_replica() should handle this automatically.
3752 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3753 assert_eq!(env.get_state(), NodeState::Detached);
3754 env.become_replica("master_node").unwrap();
3755 assert_eq!(env.get_state(), NodeState::Replica);
3756 }
3757
3758 #[test]
3759 fn test_cannot_transfer_master_when_shutdown() {
3760 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3761 env.become_master(1).unwrap();
3762 env.close().unwrap();
3763
3764 let config = MasterTransferConfig::new(
3765 "target".to_string(),
3766 Duration::from_secs(30),
3767 );
3768 let result = env.transfer_master(config);
3769 assert!(result.is_err());
3770 }
3771
3772 #[test]
3773 fn test_full_lifecycle() {
3774 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
3775
3776 // Start as detached
3777 assert_eq!(env.get_state(), NodeState::Detached);
3778
3779 // Become master
3780 env.become_master(1).unwrap();
3781 assert!(env.is_master());
3782
3783 // Register some VLSNs
3784 env.register_vlsn(1, 0, 100);
3785 env.register_vlsn(2, 0, 200);
3786
3787 // Record ack from replica
3788 env.record_ack(1, "replica1");
3789 env.record_ack(2, "replica1");
3790
3791 // Transition to replica (simulating failover)
3792 env.become_replica("node2").unwrap();
3793 assert!(env.is_replica());
3794
3795 // Apply entries from new master
3796 env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
3797
3798 // Close
3799 env.close().unwrap();
3800 assert!(env.is_shutdown());
3801 }
3802
3803 /// Verify that `with_environment` lazily registers the RESTORE service on
3804 /// the TCP dispatcher when `config.env_home` was not set at construction.
3805 ///
3806 /// This mirrors`RepNode.envSetup()` which registers the restore handler
3807 /// when the environment is wired into the replicated node.
3808 #[test]
3809 fn test_restore_registered_lazily_via_with_environment() {
3810 use noxu_dbi::EnvironmentImpl;
3811 use tempfile::TempDir;
3812
3813 let dir = TempDir::new().expect("temp dir");
3814
3815 // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
3816 let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
3817 .node_port(0)
3818 .build();
3819
3820 let rep_env = ReplicatedEnvironment::new(config).unwrap();
3821
3822 // Not yet registered.
3823 assert!(
3824 !rep_env
3825 .restore_registered
3826 .load(std::sync::atomic::Ordering::SeqCst)
3827 );
3828
3829 // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
3830 let env_impl = Arc::new(
3831 EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
3832 );
3833 rep_env.with_environment(env_impl);
3834
3835 // Now the RESTORE service must be registered.
3836 assert!(
3837 rep_env
3838 .restore_registered
3839 .load(std::sync::atomic::Ordering::SeqCst)
3840 );
3841 }
3842
3843 /// Verify that when `config.env_home` IS set at construction, the RESTORE
3844 /// service is registered immediately (not deferred).
3845 #[test]
3846 fn test_restore_registered_eagerly_when_env_home_in_config() {
3847 use tempfile::TempDir;
3848
3849 let dir = TempDir::new().expect("temp dir");
3850
3851 let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
3852 .node_port(0)
3853 .env_home(dir.path())
3854 .build();
3855
3856 let rep_env = ReplicatedEnvironment::new(config).unwrap();
3857
3858 // Should be registered immediately (env_home was in config).
3859 assert!(
3860 rep_env
3861 .restore_registered
3862 .load(std::sync::atomic::Ordering::SeqCst)
3863 );
3864 }
3865}