noxu_rep/replicated_environment.rs
1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29 AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30 ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43 ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::{
50 AnyServiceDispatcher, TcpServiceDispatcher,
51};
52use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
53use crate::network_restore_server::{
54 NetworkRestoreServer, RESTORE_SERVICE_NAME,
55};
56use crate::node_state::{NodeState, NodeStateMachine};
57use crate::rep_config::RepConfig;
58use crate::rep_stats::RepStats;
59use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
60use crate::stream::feeder::Feeder;
61use crate::stream::peer_feeder::{
62 PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
63};
64use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
65use crate::vlsn::vlsn_index::VlsnIndex;
66use crate::vlsn::vlsn_range::VlsnRange;
67
68/// Default heartbeat timeout for master liveness detection.
69const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
70
71/// A replicated database environment.
72///
73///
74///
75/// This is the entry point for replication. It wraps a standard Environment
76/// and adds replication capabilities including master election, replica
77/// streaming, and commit acknowledgments.
78///
79/// High Availability (HA) provides a replicated, embedded database
80/// management system which provides fast, reliable, and scalable data
81/// management. HA enables replication of an environment across a Replication
82/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
83///
84/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
85/// operations are executed in the same fashion in both replicated and
86/// non-replicated applications. A `ReplicatedEnvironment` must be
87/// transactional. All replicated databases created in the replicated
88/// environment must be transactional as well.
89///
90/// A `ReplicatedEnvironment` joins its replication group when it is created.
91/// When `new()` returns, the node will have established contact with the other
92/// members of the group and will be ready to service operations.
93///
94/// Replicated environments can be created with node type Electable or
95/// Secondary. Electable nodes can be masters or replicas, and participate in
96/// both master elections and commit durability decisions. Secondary nodes can
97/// only be replicas, not masters, and do not participate in either elections or
98/// durability decisions.
99///
100/// # Example
101///
102/// ```ignore
103/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
104///
105/// let config = RepConfig::builder("my_group", "node1", "localhost")
106/// .node_port(5001)
107/// .build();
108/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
109/// ```
110pub struct ReplicatedEnvironment {
111 /// The replication configuration for this node.
112 config: RepConfig,
113 /// Tracks the current node state (Detached, Unknown, Master, Replica).
114 node_state: NodeStateMachine,
115 /// Service for managing the replication group membership.
116 group_service: GroupService,
117 /// Maps VLSNs to log file positions.
118 ///
119 /// Wrapped in `Arc` so that background daemons (election driver,
120 /// VLSN-index persistence flusher) can share access without
121 /// borrowing the env. Closes finding F11 (
122 /// `docs/src/internal/api-audit-2026-05-rep.md`).
123 vlsn_index: Arc<VlsnIndex>,
124 /// Tracks acknowledgments from replicas (used by master).
125 ack_tracker: AckTracker,
126 /// Replication statistics.
127 stats: RepStats,
128 /// Active feeder threads (master -> replica streams).
129 feeders: RwLock<Vec<Feeder>>,
130 /// Replica stream for receiving updates from the master.
131 replica_stream: ReplicaStream,
132 /// Tracks the current master node.
133 master_tracker: MasterTracker,
134 /// State change listeners.
135 listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
136 /// Shutdown flag.
137 shutdown: AtomicBool,
138 /// Service dispatcher — listens on the replication port and routes
139 /// incoming connections to the appropriate service handler (feeder, etc.).
140 ///
141 /// `Plain`: plain TCP (default / Phase-2 behaviour).
142 /// `Tls`: TLS + mTLS enforcement (Phase 3, when `RepConfig::tls_config` is set
143 /// and `transport_kind` is `Tls`).
144 ///
145 /// `None` only when the bind address cannot be resolved.
146 tcp_dispatcher: Option<AnyServiceDispatcher>,
147 /// The address the `tcp_dispatcher` is actually bound to (may differ from
148 /// the configured port when port 0 is used in tests).
149 bound_addr: Option<SocketAddr>,
150
151 /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
152 ///
153 /// When set, `become_master` spawns a `FeederRunner` per replica using
154 /// `EnvironmentLogScanner`, and `become_replica` spawns a
155 /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
156 ///
157 /// In HA.
158 env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
159
160 /// Background I/O thread handles spawned during state transitions.
161 ///
162 /// Stored so that `close()` can join them cleanly. Each handle is
163 /// `Option` so we can `take()` it in `close()`.
164 io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
165
166 /// Shutdown flag shared with I/O threads so they terminate when the
167 /// environment is closed.
168 io_shutdown: AtomicBool,
169
170 /// Whether the RESTORE service has been registered on the TCP dispatcher.
171 ///
172 /// When `config.env_home` is `None` at construction time, registration is
173 /// deferred until `with_environment()` provides the env home path.
174 restore_registered: AtomicBool,
175
176 /// In-memory log queue used by the peer feeder service.
177 ///
178 /// When this node is a replica, `apply_entry()` pushes each received log
179 /// entry here. The `PeerFeederService` registered on the TCP dispatcher
180 /// reads from this queue to stream entries to downstream replicas that
181 /// are behind this node (peer-to-peer log distribution, HA style).
182 peer_scanner: Arc<PeerLogScanner>,
183
184 /// Monotonic sequence used by `await_replica_acks` to assign unique
185 /// keys to in-flight commits awaiting replica acknowledgment. In
186 /// production this should track the real master VLSN; until F11
187 /// closes the VLSN<->commit linkage, the coordinator uses a
188 /// synthetic sequence so that ack tracking is unique per commit.
189 /// See finding F1 in `docs/src/internal/api-audit-2026-05-rep.md`.
190 commit_ack_seq: std::sync::atomic::AtomicU64,
191
192 /// Shared acceptor state used by the ELECTION service handler.
193 /// The election driver updates `own_vlsn` / `own_term` here as the
194 /// node progresses; incoming acceptor sessions read it on every
195 /// connection so their replies always reflect the local node's
196 /// most recent state. Closes finding F6.
197 election_state: Arc<ElectionAcceptorState>,
198
199 /// Self-referential `Weak` populated once the env has been wrapped
200 /// in an `Arc`. Used by the replica I/O thread spawned in
201 /// `become_replica` so it can call `bootstrap_via_dispatcher` when
202 /// the master signals `NeedsRestore`.
203 ///
204 /// Populated lazily via [`Self::init_self_weak`] from `open()` and
205 /// the test harness. When unset (callers that build the env via
206 /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
207 /// the I/O thread falls back to operator-driven bootstrap.
208 self_weak: OnceLock<Weak<Self>>,
209}
210
211impl ReplicatedEnvironment {
212 /// Create a new replicated environment.
213 ///
214 ///
215 ///
216 /// Creates a replicated environment handle and starts participating in the
217 /// replication group. The node's state is determined when it joins the
218 /// group, and mastership is not preconfigured. If the group has no current
219 /// master, creation will trigger an election to determine whether this node
220 /// will participate as a Master or a Replica.
221 ///
222 /// A brand new node will always join an existing group as a Replica, unless
223 /// it is the very first electable node that is creating the group. In that
224 /// case it joins as the Master of the newly formed singleton group.
225 pub fn new(config: RepConfig) -> Result<Self> {
226 // mTLS Phase 2 (v3.1.0): peer_allowlist enforcement is real at the
227 // TLS channel layer (TlsTcpChannelListener::bind_with_tls_and_allowlist).
228 // Phase 3 (this release): when RepConfig::tls_config is set AND
229 // transport_kind is Tls, the service dispatcher itself enforces mTLS
230 // via TlsTcpServiceDispatcher. For the remaining cases (no TlsConfig
231 // or non-TLS transport) keep the Phase-2 accurate warn.
232 if !config.peer_allowlist.is_empty() {
233 match config.transport_kind {
234 crate::rep_config::RepTransportKind::Tls => {
235 if config.tls_config.is_some() {
236 log::info!(
237 "[{}] peer_allowlist ({} entries) + tls_config set; \
238 mTLS will be enforced on the service dispatcher.",
239 config.node_name,
240 config.peer_allowlist.len(),
241 );
242 } else {
243 log::info!(
244 "[{}] peer_allowlist configured ({} entries) but \
245 tls_config is None — the service dispatcher will \
246 use plain TCP. Set RepConfig::tls_config to \
247 activate end-to-end mTLS on this path.",
248 config.node_name,
249 config.peer_allowlist.len(),
250 );
251 }
252 }
253 _ => {
254 log::warn!(
255 "[{}] peer_allowlist is configured ({} entries) but \
256 transport_kind is not Tls — the allowlist has no \
257 effect without TLS transport. Set \
258 RepTransportKind::Tls to activate mTLS enforcement.",
259 config.node_name,
260 config.peer_allowlist.len(),
261 );
262 }
263 }
264 }
265 let node_state = NodeStateMachine::new();
266 let group_service = GroupService::new(config.group_name.clone());
267 let vlsn_index = {
268 // F11: try to load a previously persisted vlsn.idx from
269 // env_home if one exists. A successfully loaded index lets a
270 // restarted replica resume from where it left off without a
271 // full network restore; a missing or corrupt file falls back
272 // to a fresh in-memory index (caller will need to bootstrap).
273 if let Some(ref home) = config.env_home {
274 match crate::vlsn::persist::load_from_disk(home) {
275 Ok(Some(idx)) => {
276 log::info!(
277 "Node '{}' loaded persisted VLSN index from {} \
278 ({} entries, latest vlsn={})",
279 config.node_name,
280 home.display(),
281 idx.snapshot_entries().len(),
282 idx.get_latest_vlsn(),
283 );
284 Arc::new(idx)
285 }
286 Ok(None) => Arc::new(VlsnIndex::new(10)),
287 Err(e) => {
288 log::warn!(
289 "Node '{}' failed to load persisted VLSN index \
290 from {}: {} (treating as fresh node — network \
291 restore required)",
292 config.node_name,
293 home.display(),
294 e
295 );
296 // Best-effort: remove the corrupt file so the
297 // next persist cycle writes a clean one. A
298 // missing file is the "fresh node" baseline.
299 let _ = std::fs::remove_file(
300 crate::vlsn::persist::index_path(home),
301 );
302 Arc::new(VlsnIndex::new(10))
303 }
304 }
305 } else {
306 Arc::new(VlsnIndex::new(10))
307 }
308 };
309 let ack_tracker = AckTracker::new();
310 let stats = RepStats::new();
311 let feeders = RwLock::new(Vec::new());
312 let replica_stream = ReplicaStream::new();
313 let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
314
315 // Start the service dispatcher.
316 //
317 // Phase 3: when RepConfig::tls_config is set AND transport_kind is Tls,
318 // start a TlsTcpServiceDispatcher (mTLS enforced). Otherwise fall back
319 // to the plain-TCP TcpServiceDispatcher.
320 let listen_addr_str =
321 format!("{}:{}", config.node_host, config.node_port);
322 let mut restore_registered_init = false;
323
324 // Returns (AnyServiceDispatcher, bound_addr) or (None, None) on error.
325 let (tcp_dispatcher, bound_addr) = match listen_addr_str
326 .parse::<SocketAddr>()
327 {
328 Ok(addr) => {
329 let build_result: Result<(AnyServiceDispatcher, SocketAddr)> =
330 Self::build_dispatcher(&config, addr);
331 match build_result {
332 Ok((dispatcher, bound)) => {
333 // Register the network restore handler.
334 if let Some(ref home) = config.env_home {
335 let restore_server =
336 NetworkRestoreServer::new(home.clone());
337 dispatcher.register(
338 RESTORE_SERVICE_NAME,
339 Arc::new(restore_server),
340 );
341 log::debug!(
342 "Node '{}' RESTORE service registered \
343 (env_home={})",
344 config.node_name,
345 home.display(),
346 );
347 restore_registered_init = true;
348 }
349 let kind =
350 if dispatcher.is_tls() { "TLS" } else { "TCP" };
351 log::info!(
352 "Node '{}' {} service dispatcher started on {}",
353 config.node_name,
354 kind,
355 bound
356 );
357 (Some(dispatcher), Some(bound))
358 }
359 Err(e) => {
360 log::warn!(
361 "Node '{}' failed to start service dispatcher \
362 on {}: {}",
363 config.node_name,
364 listen_addr_str,
365 e
366 );
367 (None, None)
368 }
369 }
370 }
371 Err(e) => {
372 log::warn!(
373 "Node '{}' cannot parse listen address '{}': {}",
374 config.node_name,
375 listen_addr_str,
376 e
377 );
378 (None, None)
379 }
380 };
381
382 // Build the in-memory peer log scanner; register the peer feeder
383 // service on the dispatcher so downstream replicas can connect.
384 let peer_scanner = Arc::new(PeerLogScanner::new());
385 // F5/F31: build the acceptor state with persistence enabled when
386 // env_home is configured. Crash-durable promises are required
387 // for the Paxos safety invariant after a process restart.
388 let election_state =
389 Arc::new(if let Some(ref home) = config.env_home {
390 ElectionAcceptorState::with_env_home(
391 config.node_name.clone(),
392 1,
393 home,
394 )
395 } else {
396 ElectionAcceptorState::new(config.node_name.clone(), 1)
397 });
398 if let Some(ref dispatcher) = tcp_dispatcher {
399 let service = PeerFeederService::new(Arc::clone(&peer_scanner));
400 dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
401 log::debug!(
402 "Node '{}' PEER_FEEDER service registered",
403 config.node_name,
404 );
405 // F6: register the ELECTION service so peers can run
406 // run_acceptor against this node when proposing.
407 let election_svc =
408 Arc::new(ElectionService::new(Arc::clone(&election_state)));
409 dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
410 log::debug!(
411 "Node '{}' ELECTION service registered",
412 config.node_name,
413 );
414 }
415
416 let env = Self {
417 config,
418 node_state,
419 group_service,
420 vlsn_index,
421 ack_tracker,
422 stats,
423 feeders,
424 replica_stream,
425 master_tracker,
426 listeners: RwLock::new(Vec::new()),
427 shutdown: AtomicBool::new(false),
428 tcp_dispatcher,
429 bound_addr,
430 env_impl: StdMutex::new(None),
431 io_threads: StdMutex::new(Vec::new()),
432 io_shutdown: AtomicBool::new(false),
433 restore_registered: AtomicBool::new(restore_registered_init),
434 peer_scanner,
435 commit_ack_seq: std::sync::atomic::AtomicU64::new(1),
436 election_state,
437 self_weak: OnceLock::new(),
438 };
439
440 Ok(env)
441 }
442
443 /// Open a replicated environment with the standard production
444 /// lifecycle.
445 ///
446 /// This is the entry point recommended by the mdBook chapters:
447 /// it allocates the `ReplicatedEnvironment`, registers all
448 /// services on the TCP dispatcher, and spawns the **election
449 /// driver** background thread that runs Paxos rounds against
450 /// known peers until the node has resolved into either Master or
451 /// Replica state.
452 ///
453 /// Closes finding F6 of `docs/src/internal/api-audit-2026-05-rep.md`.
454 ///
455 /// Use [`ReplicatedEnvironment::new`] directly only when the
456 /// caller plans to drive state transitions explicitly (test
457 /// harnesses, scripted bootstrap, recovery tooling).
458 pub fn open(config: RepConfig) -> Result<Arc<Self>> {
459 let env = Arc::new(Self::new(config)?);
460 env.init_self_weak();
461 env.start_election_driver();
462 env.start_vlsn_persistence_daemon();
463 env.register_admin_service();
464 Ok(env)
465 }
466
467 /// Build the service dispatcher for this node.
468 ///
469 /// Phase 3 logic: when `config.transport_kind == Tls` AND
470 /// `config.tls_config` is `Some`, start a
471 /// [`crate::net::service_dispatcher::TlsTcpServiceDispatcher`] that
472 /// enforces mTLS with the configured `peer_allowlist`. Otherwise
473 /// start the plain-TCP [`TcpServiceDispatcher`].
474 ///
475 /// Returns `(dispatcher, bound_addr)` or a `RepError` on bind / TLS
476 /// config failure.
477 fn build_dispatcher(
478 #[cfg_attr(not(feature = "tls-rustls"), allow(unused_variables))]
479 config: &RepConfig,
480 addr: SocketAddr,
481 ) -> Result<(AnyServiceDispatcher, SocketAddr)> {
482 #[cfg(feature = "tls-rustls")]
483 if config.transport_kind == crate::rep_config::RepTransportKind::Tls {
484 use crate::auth::PeerAllowlist;
485 use crate::net::service_dispatcher::TlsTcpServiceDispatcher;
486 let tls = config.tls_config.as_ref().ok_or_else(|| {
487 RepError::ConfigError(format!(
488 "node '{}': transport_kind=Tls requires a tls_config",
489 config.node_name,
490 ))
491 })?;
492 let allowlist =
493 PeerAllowlist::new(config.peer_allowlist.iter().cloned());
494 // Fail-closed: an empty allowlist with TLS transport is a
495 // misconfiguration. The same policy is enforced at the TLS
496 // listener and QUIC constructors; downgrading to plain TCP here
497 // would be a silent security regression for a node that asked
498 // for TLS.
499 if allowlist.is_empty() {
500 return Err(RepError::ConfigError(format!(
501 "node '{}': transport_kind=Tls requires a non-empty \
502 peer_allowlist (mTLS enforcement is fail-closed)",
503 config.node_name,
504 )));
505 }
506 let disp = TlsTcpServiceDispatcher::new(addr, tls, allowlist)?;
507 let bound = disp.start()?;
508 return Ok((AnyServiceDispatcher::Tls(disp), bound));
509 }
510 // Plain-TCP dispatcher (default or when TLS config is missing).
511 let disp = TcpServiceDispatcher::new(addr).map_err(|e| {
512 RepError::NetworkError(format!("TCP dispatcher init: {e}"))
513 })?;
514 let bound = disp.start()?;
515 Ok((AnyServiceDispatcher::Plain(disp), bound))
516 }
517
518 /// Populate the env's self-referential `Weak` so background
519 /// threads can obtain a back-reference for auto-orchestrated
520 /// follow-up actions (e.g. replica auto-bootstrap on
521 /// `NeedsRestore`). Idempotent: subsequent calls are silent
522 /// no-ops because the inner [`OnceLock`] only accepts one set.
523 ///
524 /// Callers that wrap the env in `Arc` and want auto-bootstrap
525 /// behaviour should call this immediately after construction.
526 /// `Self::open` already does so. Test harnesses that drive
527 /// transitions manually (`RepTestBase`) also call this so the
528 /// auto-bootstrap path is exercised in tests.
529 pub fn init_self_weak(self: &Arc<Self>) {
530 let _ = self.self_weak.set(Arc::downgrade(self));
531 }
532
533 /// Register the `ADMIN` service handler on the TCP dispatcher.
534 ///
535 /// Closes findings F7 / F8. Holds a `Weak<Self>` so the handler
536 /// does not extend the env's lifetime. Idempotent: re-registering
537 /// is harmless because `TcpServiceDispatcher::register` overwrites
538 /// the existing handler.
539 pub fn register_admin_service(self: &Arc<Self>) {
540 if let Some(ref dispatcher) = self.tcp_dispatcher {
541 crate::group_admin::register_admin_service(
542 dispatcher,
543 Arc::downgrade(self),
544 );
545 log::debug!(
546 "Node '{}' ADMIN service registered",
547 self.config.node_name,
548 );
549 }
550 }
551
552 /// Spawn the VLSN-index persistence daemon (F11).
553 ///
554 /// Periodically (every 2 seconds) snapshots the in-memory
555 /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
556 /// resume from where the replica left off without a full network
557 /// restore. No-op when `config.env_home` is `None`.
558 ///
559 /// Idempotent: only one daemon is ever spawned per env.
560 pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
561 let Some(home) = self.config.env_home.clone() else {
562 return;
563 };
564 {
565 let threads = self.io_threads.lock().unwrap();
566 if threads.iter().any(|h| {
567 h.thread()
568 .name()
569 .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
570 }) {
571 return;
572 }
573 }
574
575 let vlsn_index = Arc::clone(&self.vlsn_index);
576 let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
577 let me = Arc::clone(self);
578 let interval = Duration::from_secs(2);
579
580 let handle = std::thread::Builder::new()
581 .name(name)
582 .spawn(move || {
583 use std::sync::atomic::Ordering;
584 let mut last_persisted_vlsn: u64 = 0;
585 while !me.io_shutdown.load(Ordering::SeqCst)
586 && !me.is_shutdown()
587 {
588 std::thread::sleep(interval);
589 if me.io_shutdown.load(Ordering::SeqCst) {
590 break;
591 }
592 let latest = vlsn_index.get_latest_vlsn();
593 if latest == last_persisted_vlsn {
594 // Nothing new to flush.
595 continue;
596 }
597 // X-2: cap the flush at the last durable checkpoint's
598 // end LSN so the persisted VLSN index never claims
599 // VLSNs beyond the durable B-tree state. After a crash
600 // the recovered tree and the index will be coherent.
601 let cap_lsn = me
602 .env_impl
603 .lock()
604 .unwrap()
605 .as_ref()
606 .and_then(|e| e.get_checkpointer())
607 .map(|c| c.get_last_checkpoint_end())
608 .unwrap_or(noxu_util::NULL_LSN);
609 match crate::vlsn::persist::flush_to_disk_capped(
610 &vlsn_index,
611 &home,
612 cap_lsn,
613 ) {
614 Ok(n) => {
615 log::trace!(
616 "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
617 n,
618 latest,
619 cap_lsn,
620 );
621 last_persisted_vlsn = latest;
622 }
623 Err(e) => {
624 log::warn!(
625 "vlsn-flush: failed to persist VLSN index to {}: {}",
626 home.display(),
627 e
628 );
629 }
630 }
631 }
632 // Final flush on shutdown so a clean close is recoverable.
633 // Cap at the last checkpoint even for the shutdown flush.
634 let cap_lsn = me
635 .env_impl
636 .lock()
637 .unwrap()
638 .as_ref()
639 .and_then(|e| e.get_checkpointer())
640 .map(|c| c.get_last_checkpoint_end())
641 .unwrap_or(noxu_util::NULL_LSN);
642 if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
643 &vlsn_index,
644 &home,
645 cap_lsn,
646 ) {
647 log::warn!(
648 "vlsn-flush (final): failed to persist VLSN index: {}",
649 e
650 );
651 }
652 })
653 .expect("failed to spawn noxu-vlsn-flush thread");
654
655 self.io_threads.lock().unwrap().push(handle);
656 log::debug!(
657 "Node '{}' VLSN persistence daemon started",
658 self.config.node_name,
659 );
660 }
661
662 /// Spawn the election driver background thread.
663 ///
664 /// While the env is in `Detached` or `Unknown` state and no master
665 /// is known, the driver periodically attempts a Paxos election
666 /// against peers in `GroupService` (whose ELECTION services were
667 /// registered at `open()` time). On success the driver calls
668 /// `become_master` (if this node is the winner) or `become_replica`
669 /// (otherwise). On failure (no quorum), the driver waits
670 /// `config.election_timeout` and tries again.
671 ///
672 /// The driver respects `io_shutdown`; on env close the loop exits
673 /// promptly.
674 ///
675 /// Idempotent: a second call is a no-op (only one driver thread is
676 /// ever spawned per env).
677 pub fn start_election_driver(self: &Arc<Self>) {
678 use std::sync::atomic::Ordering;
679 // Reuse io_shutdown for cancellation; a successful spawn is
680 // recorded by appending to io_threads, so a duplicate call
681 // would re-add a thread — we use a one-shot `AtomicBool`
682 // sentinel placed in the io_shutdown's slot via a new field.
683 // Cheaper: a static name check on io_threads is impossible;
684 // instead, gate spawning on whether any io_thread already
685 // carries the driver name.
686 {
687 let threads = self.io_threads.lock().unwrap();
688 if threads.iter().any(|h| {
689 h.thread()
690 .name()
691 .is_some_and(|n| n.starts_with("noxu-election-"))
692 }) {
693 return;
694 }
695 }
696
697 let me = Arc::clone(self);
698 let name = format!("noxu-election-{}", self.config.node_name);
699 let handle = std::thread::Builder::new()
700 .name(name)
701 .spawn(move || {
702 me.run_election_loop();
703 })
704 .expect("failed to spawn election driver thread");
705 self.io_threads.lock().unwrap().push(handle);
706 log::debug!("Node '{}' election driver started", self.config.node_name,);
707 // Keep ordering sane on the io_shutdown flag.
708 let _ = self.io_shutdown.load(Ordering::SeqCst);
709 }
710
711 /// Body of the election driver loop. Public only for tests; called
712 /// by [`Self::start_election_driver`].
713 fn run_election_loop(self: Arc<Self>) {
714 use std::sync::atomic::Ordering;
715 // Maintain an internal monotonically increasing election term.
716 // Each successful or failed round bumps the term so retries do
717 // not collide with stale acceptor promises.
718 let mut term: u64 = 1;
719
720 loop {
721 if self.io_shutdown.load(Ordering::SeqCst) {
722 return;
723 }
724 if self.is_shutdown() {
725 return;
726 }
727
728 let state = self.node_state.get_state();
729 // Stop driving once we've resolved into Master/Replica;
730 // re-arm only if the node returns to Unknown.
731 if matches!(state, NodeState::Master | NodeState::Replica) {
732 std::thread::sleep(Duration::from_millis(200));
733 continue;
734 }
735 if matches!(state, NodeState::Shutdown) {
736 return;
737 }
738
739 // Probe peers for an active master via the existing
740 // GroupService cache. In the absence of a heartbeat path
741 // we rely on master_tracker (set by become_replica from
742 // the receive loop).
743 if let Some(master_name) = self.master_tracker.get_master()
744 && master_name != self.config.node_name
745 {
746 let _ = self.become_replica(&master_name);
747 continue;
748 }
749
750 // Snapshot peers to dial for ELECTION.
751 let peers: Vec<(String, SocketAddr)> = self
752 .group_service
753 .get_all_nodes()
754 .into_iter()
755 .filter(|n| n.name != self.config.node_name)
756 .filter_map(|n| {
757 format!("{}:{}", n.host, n.port)
758 .parse::<SocketAddr>()
759 .ok()
760 .map(|a| (n.name, a))
761 })
762 .collect();
763
764 // Build the local rep group view used by run_election to
765 // compute quorum and resolve the winner name. Include
766 // self.
767 let group = self.local_rep_group_with_self();
768
769 // Update election state for any concurrent acceptor calls.
770 let our_vlsn = self.vlsn_index.get_latest_vlsn();
771 self.election_state.set_vlsn(our_vlsn);
772 self.election_state.set_term(term);
773
774 // Connect to each peer's ELECTION service. Failures are
775 // tolerated: a peer that doesn't answer simply contributes
776 // no vote. The election may still reach quorum in the
777 // remaining peers.
778 let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
779 Vec::new();
780 for (peer_name, addr) in &peers {
781 match crate::net::service_dispatcher::connect_to_service(
782 *addr,
783 ELECTION_SERVICE_NAME,
784 ) {
785 Ok(ch) => {
786 let arc: Arc<dyn crate::net::channel::Channel> =
787 Arc::new(ch);
788 channels.push(arc);
789 }
790 Err(e) => {
791 log::trace!(
792 "election driver: peer {} ({}) unreachable: {}",
793 peer_name,
794 addr,
795 e
796 );
797 }
798 }
799 }
800
801 // Resolve our own node_id from the group; if not present
802 // we cannot run an election (closed-world guard — see F22).
803 let self_node_id =
804 group.get_node(&self.config.node_name).map(|n| n.node_id());
805 let self_node_id = match self_node_id {
806 Some(id) => id,
807 None => {
808 log::warn!(
809 "election driver: node '{}' not registered in \
810 own group view; sleeping",
811 self.config.node_name
812 );
813 std::thread::sleep(Duration::from_millis(200));
814 continue;
815 }
816 };
817
818 log::debug!(
819 "election driver on '{}': starting term={} with {} peers",
820 self.config.node_name,
821 term,
822 channels.len(),
823 );
824 let outcome = crate::elections::paxos::run_election(
825 self_node_id,
826 &self.config.node_name,
827 &group,
828 &channels,
829 our_vlsn,
830 /* priority */ 1,
831 term,
832 );
833
834 match outcome {
835 Some(winner_id) if winner_id == self_node_id => {
836 if let Err(e) = self.become_master(term) {
837 log::warn!(
838 "election driver: become_master failed: {}",
839 e
840 );
841 } else {
842 log::info!(
843 "election driver: '{}' became master at term {}",
844 self.config.node_name,
845 term,
846 );
847 }
848 }
849 Some(winner_id) => {
850 if let Some(winner_node) = group
851 .get_nodes()
852 .into_iter()
853 .find(|n| n.node_id() == winner_id)
854 {
855 if let Err(e) = self.become_replica(&winner_node.name) {
856 log::warn!(
857 "election driver: become_replica failed: {}",
858 e
859 );
860 } else {
861 log::info!(
862 "election driver: '{}' became replica of '{}' at term {}",
863 self.config.node_name,
864 winner_node.name,
865 term,
866 );
867 }
868 }
869 }
870 None => {
871 log::debug!(
872 "election driver on '{}' term={}: no quorum",
873 self.config.node_name,
874 term,
875 );
876 }
877 }
878
879 term = term.saturating_add(1);
880 // Back off so we don't pin the loop on transient failures.
881 std::thread::sleep(
882 self.config.election_timeout.min(Duration::from_millis(500)),
883 );
884 }
885 }
886
887 /// Internal: a `RepGroup` snapshot that includes self.
888 fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
889 let mut group = self.get_rep_group();
890 // Ensure self is present in the group view; the
891 // group_service does not auto-register the local node.
892 if group.get_node(&self.config.node_name).is_none() {
893 let mut self_node = crate::rep_node::RepNode::new(
894 self.config.node_name.clone(),
895 self.config.node_type,
896 self.config.node_host.clone(),
897 self.config.node_port,
898 /* node_id */ 0,
899 );
900 // Stable self node_id derived from the name hash so
901 // re-creations in the same process don't collide.
902 use std::hash::{Hash, Hasher};
903 let mut hasher = std::collections::hash_map::DefaultHasher::new();
904 self.config.node_name.hash(&mut hasher);
905 // Restrict to a u32 range and avoid 0 (reserved for
906 // "unknown").
907 let id = ((hasher.finish() as u32) | 1).max(1);
908 self_node.node_id = id;
909 group.add_node(self_node);
910 }
911 group
912 }
913
914 /// Return the socket address the TCP service dispatcher is bound to.
915 ///
916 /// This may differ from the configured `node_port` when port 0 is used
917 /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
918 /// could not be started (e.g. the address is not resolvable).
919 pub fn bound_addr(&self) -> Option<SocketAddr> {
920 self.bound_addr
921 }
922
923 /// Wire a live `EnvironmentImpl` into this replicated environment.
924 ///
925 /// After this call, state transitions (`become_master`, `become_replica`)
926 /// will spawn real feeder/receiver I/O threads backed by the live log.
927 ///
928 /// If the RESTORE service was not registered at construction time (because
929 /// `config.env_home` was `None`), it is registered here using the
930 /// environment's actual home path. This mirrors`RepNode.envSetup()`
931 /// which registers the restore handler during environment wiring.
932 ///
933 /// Environment reference wiring.
934 /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
935 pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
936 // Register RESTORE service lazily if not already done.
937 if !self.restore_registered.load(Ordering::SeqCst)
938 && let Some(ref dispatcher) = self.tcp_dispatcher
939 {
940 let env_home = env.get_env_home().to_path_buf();
941 let restore_server = NetworkRestoreServer::new(env_home.clone());
942 dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
943 self.restore_registered.store(true, Ordering::SeqCst);
944 log::debug!(
945 "Node '{}' RESTORE service registered via with_environment \
946 (env_home={})",
947 self.config.node_name,
948 env_home.display(),
949 );
950 }
951
952 // X-14: rebuild the VLSN index from recovery-replayed LN entries.
953 // After a crash the on-disk vlsn.idx may be stale (either ahead of
954 // the recovered B-tree, or behind if vlsn.idx was not flushed
955 // after the last checkpoint). Re-registering all (vlsn, lsn) pairs
956 // from the redo pass gives a consistent in-memory index.
957 if !env.recovery_vlsns.is_empty() {
958 log::info!(
959 "Node '{}': rebuilding VLSN index from {} recovered entries",
960 self.config.node_name,
961 env.recovery_vlsns.len(),
962 );
963 for &(vlsn, lsn_u64) in &env.recovery_vlsns {
964 let lsn = noxu_util::Lsn::from_u64(lsn_u64);
965 self.vlsn_index.register(
966 vlsn,
967 lsn.file_number(),
968 lsn.file_offset(),
969 );
970 }
971 }
972
973 // X-1: truncate the VLSN index to the rollback matchpoint if recovery
974 // detected a completed rollback period. The matchpoint is the highest
975 // LSN that is still valid after the rollback; entries with higher VLSNs
976 // correspond to data that was rolled back and must not appear in the
977 // index.
978 if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
979 // Find the latest VLSN whose LSN is at or before the matchpoint.
980 // Scan the recovered VLSN pairs (sorted ascending) to find the
981 // boundary.
982 let safe_vlsn = env
983 .recovery_vlsns
984 .iter()
985 .rev()
986 .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
987 .map(|&(vlsn, _)| vlsn)
988 .unwrap_or(0);
989 log::info!(
990 "Node '{}': truncating VLSN index after vlsn={} \
991 (rollback matchpoint lsn={:#x})",
992 self.config.node_name,
993 safe_vlsn,
994 matchpoint_lsn_u64,
995 );
996 self.vlsn_index.truncate_after(safe_vlsn);
997 }
998
999 *self.env_impl.lock().unwrap() = Some(env);
1000 }
1001
1002 /// Get the current node state.
1003 ///
1004 ///
1005 ///
1006 /// Returns the current state of the node associated with this replication
1007 /// environment. If the caller's intent is to track the state of the node,
1008 /// `StateChangeListener` may be a more convenient and efficient approach.
1009 pub fn get_state(&self) -> NodeState {
1010 self.node_state.get_state()
1011 }
1012
1013 /// Check if this node is the master.
1014 ///
1015 /// Returns true if the node's current state is Master.
1016 pub fn is_master(&self) -> bool {
1017 self.node_state.get_state() == NodeState::Master
1018 }
1019
1020 /// Check if this node is a replica.
1021 ///
1022 /// Returns true if the node's current state is Replica.
1023 pub fn is_replica(&self) -> bool {
1024 self.node_state.get_state() == NodeState::Replica
1025 }
1026
1027 /// Returns true if the node is currently participating in the group
1028 /// as a Replica or a Master.
1029 pub fn is_active(&self) -> bool {
1030 self.node_state.get_state().is_active()
1031 }
1032
1033 /// Get the node name.
1034 ///
1035 ///
1036 ///
1037 /// Returns the unique name used to identify this replicated environment.
1038 pub fn get_node_name(&self) -> &str {
1039 self.config.node_name.as_str()
1040 }
1041
1042 /// Get the group name.
1043 ///
1044 /// Returns the name of the replication group this node belongs to.
1045 pub fn get_group_name(&self) -> &str {
1046 self.config.group_name.as_str()
1047 }
1048
1049 /// Get the current master (if known).
1050 ///
1051 /// Returns the name of the node that is currently the master, or None
1052 /// if the master is not known (e.g. the node is in the Unknown or
1053 /// Detached state).
1054 pub fn get_master_name(&self) -> Option<String> {
1055 self.master_tracker.get_master()
1056 }
1057
1058 /// Get the replication group info.
1059 ///
1060 ///
1061 ///
1062 /// Returns a description of the replication group as known by this node.
1063 /// The replicated group metadata is stored in a replicated database and
1064 /// updates are propagated by the current master node to all replicas. If
1065 /// this node is not the master, it is possible for its description of the
1066 /// group to be out of date.
1067 pub fn get_group(&self) -> &GroupService {
1068 &self.group_service
1069 }
1070
1071 /// Add a peer node to the replication group at runtime.
1072 ///
1073 /// The node is registered in the `GroupService` so elections and quorum
1074 /// calculations immediately reflect the new membership.
1075 pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
1076 use crate::group_service::NodeInfo;
1077 use std::time::Instant;
1078
1079 let info = NodeInfo {
1080 name: node.name.clone(),
1081 node_type: node.node_type,
1082 host: node.host.clone(),
1083 port: node.port,
1084 node_id: node.node_id,
1085 joined_at: Instant::now(),
1086 last_seen: Instant::now(),
1087 is_active: true,
1088 known_vlsn: 0,
1089 log_range: None,
1090 read_capacity_pct: node.read_capacity_pct,
1091 write_capacity_pct: node.write_capacity_pct,
1092 latency_hint_ms: node.latency_hint_ms,
1093 };
1094 self.group_service.add_node(info)?;
1095 log::info!(
1096 "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1097 self.config.node_name,
1098 node.name,
1099 node.host,
1100 node.port,
1101 self.config.group_name,
1102 );
1103
1104 // F9: if we are the current master, immediately register a
1105 // `Feeder` tracker for the new peer so AckTracker bookkeeping
1106 // and downstream pull-based streaming work without a forced
1107 // re-election.
1108 if self.is_master()
1109 && (node.node_type == crate::node_type::NodeType::Electable
1110 || node.node_type == crate::node_type::NodeType::Secondary)
1111 {
1112 let mut feeders = self.feeders.write();
1113 if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1114 feeders.push(Feeder::new(node.name.clone()));
1115 log::debug!(
1116 "Node '{}' (master): dispatched Feeder for new peer '{}'",
1117 self.config.node_name,
1118 node.name,
1119 );
1120 }
1121 }
1122 Ok(())
1123 }
1124
1125 /// Remove a peer node from the replication group by name.
1126 ///
1127 /// The node is deregistered from the `GroupService`. Elections initiated
1128 /// after this call will not include the removed node in quorum calculations.
1129 pub fn remove_peer(&self, name: &str) -> Result<()> {
1130 self.group_service.remove_node(name)?;
1131 log::info!(
1132 "Node '{}': removed peer '{}' from group '{}'",
1133 self.config.node_name,
1134 name,
1135 self.config.group_name,
1136 );
1137 Ok(())
1138 }
1139
1140 /// Update the capacity and latency metadata of an existing peer.
1141 ///
1142 /// Only the following fields are updated from `node`:
1143 /// - `read_capacity_pct`
1144 /// - `write_capacity_pct`
1145 /// - `latency_hint_ms`
1146 ///
1147 /// The node's identity (name, address, port, node_type) is preserved.
1148 /// Safe to call while replication is active.
1149 ///
1150 /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1151 /// is rebuilt to reflect the new capacity/latency weights.
1152 ///
1153 /// # Note
1154 ///
1155 /// `update_peer_metadata` does not currently re-run
1156 /// `QuorumPolicy::validate(electable_count)` after the metadata
1157 /// change. An LP-optimal `Expression` quorum that was safe before
1158 /// the update may no longer satisfy the intersection property
1159 /// afterwards. Until automatic revalidation lands, deployments
1160 /// using `QuorumPolicy::Expression` should call
1161 /// `quorum_policy().validate(get_rep_group().electable_count())`
1162 /// on the returned `RepGroup` after every metadata change and
1163 /// fail the operator-facing operation if validation reports
1164 /// unsafety.
1165 pub fn update_peer_metadata(
1166 &self,
1167 name: &str,
1168 node: crate::rep_node::RepNode,
1169 ) -> Result<()> {
1170 self.group_service.update_node_metadata(
1171 name,
1172 node.read_capacity_pct,
1173 node.write_capacity_pct,
1174 node.latency_hint_ms,
1175 )?;
1176 log::info!(
1177 "Node '{}': updated metadata for peer '{}' \
1178 (read_cap={}, write_cap={}, latency={}ms)",
1179 self.config.node_name,
1180 name,
1181 node.read_capacity_pct,
1182 node.write_capacity_pct,
1183 node.latency_hint_ms,
1184 );
1185 Ok(())
1186 }
1187
1188 /// Returns a snapshot of the current replication group as a `RepGroup`.
1189 ///
1190 /// The snapshot reflects the state at the time of the call; subsequent
1191 /// `add_peer` / `remove_peer` calls are not reflected in it.
1192 pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1193 use crate::rep_group::RepGroup;
1194
1195 let mut group = RepGroup::new(
1196 self.config.group_name.clone(),
1197 self.group_service.get_group_id(),
1198 );
1199 for info in self.group_service.get_all_nodes() {
1200 let mut node = crate::rep_node::RepNode::new(
1201 info.name.clone(),
1202 info.node_type,
1203 info.host.clone(),
1204 info.port,
1205 info.node_id,
1206 );
1207 node.read_capacity_pct = info.read_capacity_pct;
1208 node.write_capacity_pct = info.write_capacity_pct;
1209 node.latency_hint_ms = info.latency_hint_ms;
1210 group.add_node(node);
1211 }
1212 group
1213 }
1214
1215 /// Get the replication configuration.
1216 ///
1217 ///
1218 ///
1219 /// Returns the replication configuration that has been used to create this
1220 /// environment.
1221 pub fn get_config(&self) -> &RepConfig {
1222 &self.config
1223 }
1224
1225 /// Get the current VLSN range on this node.
1226 ///
1227 /// Returns the range of VLSNs currently available on this node.
1228 pub fn get_vlsn_range(&self) -> VlsnRange {
1229 self.vlsn_index.get_range()
1230 }
1231
1232 /// Get the latest VLSN.
1233 ///
1234 /// Returns the most recent VLSN registered on this node.
1235 pub fn get_current_vlsn(&self) -> u64 {
1236 self.vlsn_index.get_latest_vlsn()
1237 }
1238
1239 /// Return the list of replica names that currently have a `Feeder`
1240 /// tracker on this (master) node.
1241 ///
1242 /// Used by tests and operator tooling. The returned list reflects
1243 /// the master's view at the time of the call; subsequent
1244 /// `add_peer`/`remove_peer` calls may change it.
1245 pub fn feeder_replica_names(&self) -> Vec<String> {
1246 self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1247 }
1248
1249 /// Bootstrap this node's environment by network-restoring all `.ndb`
1250 /// files from `peer_name` via the dispatcher's RESTORE service.
1251 ///
1252 /// Closes findings F2 / F4 of `docs/src/internal/api-audit-2026-05-rep.md`.
1253 ///
1254 /// The standalone `NetworkRestore::execute()` opens raw TCP and
1255 /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1256 /// Production replicated environments host the RESTORE handler on the
1257 /// dispatcher, so this method routes through `execute_via_dispatcher`.
1258 ///
1259 /// `peer_name` must be a known peer in `GroupService`; on success the
1260 /// peer's `.ndb` files are written into `config.env_home`. Returns
1261 /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1262 /// fails for any reason.
1263 pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1264 let env_home = self.config.env_home.clone().ok_or_else(|| {
1265 RepError::ConfigError(
1266 "bootstrap_via_dispatcher requires env_home in RepConfig"
1267 .into(),
1268 )
1269 })?;
1270 let peer_info = self
1271 .group_service
1272 .get_all_nodes()
1273 .into_iter()
1274 .find(|n| n.name == peer_name)
1275 .ok_or_else(|| {
1276 RepError::ConfigError(format!(
1277 "peer '{}' not registered in group '{}'",
1278 peer_name, self.config.group_name,
1279 ))
1280 })?;
1281
1282 let cfg = NetworkRestoreConfig {
1283 source_node: peer_info.name.clone(),
1284 source_host: peer_info.host.clone(),
1285 source_port: peer_info.port,
1286 retain_log_files: true,
1287 };
1288 let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1289 restore.execute_via_dispatcher()?;
1290 log::info!(
1291 "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1292 self.config.node_name,
1293 peer_info.name,
1294 peer_info.host,
1295 peer_info.port,
1296 );
1297 Ok(())
1298 }
1299
1300 /// Get replication statistics.
1301 ///
1302 ///
1303 ///
1304 /// Returns statistics associated with this environment.
1305 pub fn get_stats(&self) -> &RepStats {
1306 &self.stats
1307 }
1308
1309 /// Get the ack tracker.
1310 pub fn get_ack_tracker(&self) -> &AckTracker {
1311 &self.ack_tracker
1312 }
1313
1314 /// Ensure the node state machine is in Unknown state, transitioning
1315 /// from Detached if necessary. This is needed because the state machine
1316 /// only allows Detached -> Unknown -> Master/Replica.
1317 pub fn ensure_unknown_state(&self) -> Result<()> {
1318 let current = self.node_state.get_state();
1319 match current {
1320 NodeState::Unknown => Ok(()),
1321 NodeState::Detached => {
1322 self.node_state.transition_to(NodeState::Unknown)?;
1323 Ok(())
1324 }
1325 // Master and Replica must transition through Unknown before
1326 // joining a new group or reconnecting.
1327 NodeState::Master | NodeState::Replica => {
1328 self.node_state.transition_to(NodeState::Unknown)?;
1329 Ok(())
1330 }
1331 NodeState::Shutdown => {
1332 Err(RepError::StateError("Node is shut down".to_string()))
1333 }
1334 }
1335 }
1336
1337 /// Transition to master state.
1338 ///
1339 /// Transitions this node to Master state for the given election term.
1340 /// As master, the node can accept write operations and feed log entries
1341 /// to replicas.
1342 ///
1343 /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1344 /// a `FeederRunner` + `EnvironmentLogScanner` background thread is spawned
1345 /// for each currently-registered replica (feeder entries in `feeders`).
1346 ///
1347 /// In HA.
1348 pub fn become_master(&self, term: u64) -> Result<()> {
1349 if self.is_shutdown() {
1350 return Err(RepError::StateError(
1351 "Cannot become master: environment is closed".to_string(),
1352 ));
1353 }
1354
1355 // JE invariant: only `Electable` nodes can become master. `Secondary`,
1356 // `Monitor`, and `Arbiter` are not electable and must be rejected at
1357 // the API layer (mirrors JE `ExceptionTest`). See
1358 // `NodeType::can_be_master`.
1359 if !self.config.node_type.can_be_master() {
1360 return Err(RepError::InvalidStateTransition(format!(
1361 "node '{}' has type {} which is not electable as master",
1362 self.config.node_name.as_str(),
1363 self.config.node_type,
1364 )));
1365 }
1366
1367 // Ensure we can reach Master state (may need Detached -> Unknown first)
1368 self.ensure_unknown_state()?;
1369
1370 let old_state = self.node_state.get_state();
1371 self.node_state.transition_to(NodeState::Master)?;
1372 self.master_tracker.set_master(self.config.node_name.as_str(), term);
1373
1374 // --- F9: spawn Feeder trackers for each known replica -------------
1375 //
1376 // Closes finding F9 of `docs/src/internal/api-audit-2026-05-rep.md`.
1377 // The architecture is pull-based: replicas pull from the master's
1378 // `PEER_FEEDER` service via `catch_up_from_peer`. However, the
1379 // master must:
1380 // 1. Track each replica via a `Feeder` so AckTracker bookkeeping
1381 // can attribute replica acks to the right node.
1382 // 2. Push its own writes into `peer_scanner` so replicas pulling
1383 // from `PEER_FEEDER` actually receive entries (`replicate_entry`).
1384 //
1385 // Here we ensure step 1: every known electable peer in the group
1386 // gets a `Feeder` entry.
1387 {
1388 let mut feeders = self.feeders.write();
1389 // Drop any stale feeders left over from a prior role. A
1390 // `Feeder` is just an in-memory tracker; recreating it is
1391 // cheap and avoids state inversion bugs across role changes.
1392 feeders.clear();
1393 for peer in self.group_service.get_all_nodes() {
1394 if peer.name == self.config.node_name {
1395 continue;
1396 }
1397 if peer.node_type != crate::node_type::NodeType::Electable
1398 && peer.node_type != crate::node_type::NodeType::Secondary
1399 {
1400 // Arbiters do not receive log entries.
1401 continue;
1402 }
1403 feeders.push(Feeder::new(peer.name.clone()));
1404 log::debug!(
1405 "Node '{}' (master, term={}): registered Feeder for \
1406 replica '{}'",
1407 self.config.node_name.as_str(),
1408 term,
1409 peer.name,
1410 );
1411 }
1412 }
1413
1414 // For observability, log the count.
1415 log::info!(
1416 "Node '{}' became master for term {} \
1417 (feeder trackers: {} known replicas)",
1418 self.config.node_name.as_str(),
1419 term,
1420 self.feeders.read().len(),
1421 );
1422
1423 // -------------------------------------------------------------------
1424
1425 // Notify listeners
1426 self.notify_listeners(old_state, NodeState::Master);
1427
1428 Ok(())
1429 }
1430
1431 /// Transition to replica state with the given master.
1432 ///
1433 /// Transitions this node to Replica state. The node will receive log
1434 /// entries from the specified master.
1435 ///
1436 /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1437 /// the method prepares an `EnvironmentLogWriter` so that replicated
1438 /// entries can be written to the local log. The actual network connection
1439 /// is established by the `TcpServiceDispatcher`; this method logs intent.
1440 ///
1441 /// In HA.
1442 pub fn become_replica(&self, master_name: &str) -> Result<()> {
1443 if self.is_shutdown() {
1444 return Err(RepError::StateError(
1445 "Cannot become replica: environment is closed".to_string(),
1446 ));
1447 }
1448
1449 // Ensure we can reach Replica state (may need Detached -> Unknown first)
1450 self.ensure_unknown_state()?;
1451
1452 let old_state = self.node_state.get_state();
1453 self.node_state.transition_to(NodeState::Replica)?;
1454 self.master_tracker.set_master(master_name, 0);
1455 self.replica_stream.set_master(master_name);
1456 self.replica_stream.set_state(
1457 crate::stream::replica_stream::ReplicaStreamState::Connecting,
1458 );
1459
1460 // --- G19: start replica receive loop --------------------------------
1461 //
1462 // Connects to the master's PEER_FEEDER service and runs a
1463 // ReplicaReceiver loop in a background thread. The receiver writes
1464 // replicated entries via EnvironmentLogWriter.
1465 if let Some(env) = self.env_impl.lock().unwrap().clone() {
1466 if let Some(log_mgr) = env.get_log_manager() {
1467 let vlsn_index =
1468 Arc::new(crate::vlsn::vlsn_index::VlsnIndex::new(10));
1469
1470 // Resolve the master's socket address from the GroupService.
1471 let master_addr_opt: Option<SocketAddr> = self
1472 .group_service
1473 .get_all_nodes()
1474 .iter()
1475 .find(|n| n.name == master_name)
1476 .and_then(|info| {
1477 format!("{}:{}", info.host, info.port)
1478 .parse::<SocketAddr>()
1479 .ok()
1480 });
1481
1482 let node_name = self.config.node_name.clone();
1483 let master = master_name.to_string();
1484 let vlsn_index_clone = Arc::clone(&vlsn_index);
1485 let shutdown_flag = self.io_shutdown.load(Ordering::SeqCst);
1486 // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
1487 // can call `bootstrap_via_dispatcher` automatically when
1488 // the master signals `NeedsRestore`. When the env was
1489 // never registered with `init_self_weak` (raw
1490 // `Arc::new(Self::new(...))` without going through
1491 // `open()` or the test harness), the weak ref is `None`
1492 // and we fall back to operator-driven bootstrap.
1493 let self_weak: Option<Weak<Self>> =
1494 self.self_weak.get().cloned();
1495
1496 let handle = std::thread::Builder::new()
1497 .name(format!("noxu-replica-{}", node_name))
1498 .spawn(move || {
1499 let mut writer = EnvironmentLogWriter::new(
1500 log_mgr,
1501 vlsn_index_clone,
1502 );
1503
1504 let Some(addr) = master_addr_opt else {
1505 log::warn!(
1506 "noxu-replica-{}: master '{}' address not in RepGroup; \
1507 waiting for TCP dispatcher connection",
1508 node_name, master,
1509 );
1510 return;
1511 };
1512
1513 // Catch-up loop: catch up, observe NeedsRestore,
1514 // optionally auto-bootstrap, retry once. We cap
1515 // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
1516 // (small) so a misbehaving master does not loop
1517 // forever consuming network bandwidth.
1518 const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
1519 let mut attempts: u32 = 0;
1520 loop {
1521 log::info!(
1522 "noxu-replica-{}: connecting to master '{}' at {}",
1523 node_name, master, addr,
1524 );
1525 match crate::stream::peer_feeder::catch_up_from_peer(
1526 addr, 0, &mut writer,
1527 ) {
1528 Ok(true) => {
1529 log::info!(
1530 "noxu-replica-{}: catch-up complete from '{}'",
1531 node_name, master,
1532 );
1533 return;
1534 }
1535 Ok(false) => {
1536 // F2/F4: master signals NeedsRestore.
1537 // Wave 9-A fix 2: if a Weak<Self> was
1538 // plumbed in, upgrade it and call
1539 // `bootstrap_via_dispatcher` ourselves
1540 // so the replica auto-bootstraps and
1541 // resumes catch-up without operator
1542 // intervention.
1543 log::warn!(
1544 "noxu-replica-{}: master '{}' requires restore",
1545 node_name, master,
1546 );
1547 attempts += 1;
1548 if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
1549 log::error!(
1550 "noxu-replica-{}: exceeded \
1551 auto-bootstrap attempts ({}); giving up",
1552 node_name,
1553 MAX_AUTO_BOOTSTRAP_ATTEMPTS,
1554 );
1555 return;
1556 }
1557 let env_arc = match self_weak
1558 .as_ref()
1559 .and_then(Weak::upgrade)
1560 {
1561 Some(e) => e,
1562 None => {
1563 // No back-ref or env dropped:
1564 // fall back to operator-driven
1565 // bootstrap and exit cleanly.
1566 log::warn!(
1567 "noxu-replica-{}: no back-reference \
1568 available; operator must call \
1569 bootstrap_via_dispatcher manually",
1570 node_name,
1571 );
1572 return;
1573 }
1574 };
1575 if env_arc.is_shutdown() {
1576 return;
1577 }
1578 log::info!(
1579 "noxu-replica-{}: auto-bootstrapping via \
1580 dispatcher from '{}' (attempt {})",
1581 node_name, master, attempts,
1582 );
1583 match env_arc
1584 .bootstrap_via_dispatcher(&master)
1585 {
1586 Ok(()) => {
1587 log::info!(
1588 "noxu-replica-{}: auto-bootstrap \
1589 succeeded; resuming catch-up",
1590 node_name,
1591 );
1592 // Drop the strong ref before
1593 // re-entering catch-up so we
1594 // do not keep the env alive
1595 // longer than necessary.
1596 drop(env_arc);
1597 continue;
1598 }
1599 Err(e) => {
1600 log::error!(
1601 "noxu-replica-{}: auto-bootstrap \
1602 failed: {}",
1603 node_name, e,
1604 );
1605 return;
1606 }
1607 }
1608 }
1609 Err(e) => {
1610 if !shutdown_flag {
1611 log::error!(
1612 "noxu-replica-{}: error from master '{}': {e}",
1613 node_name, master,
1614 );
1615 }
1616 return;
1617 }
1618 }
1619 }
1620 })
1621 .expect("failed to spawn noxu-replica thread");
1622
1623 self.io_threads.lock().unwrap().push(handle);
1624
1625 log::debug!(
1626 "Node '{}': replica receive thread started for master '{}'",
1627 self.config.node_name.as_str(),
1628 master_name,
1629 );
1630 } else {
1631 log::warn!(
1632 "Node '{}': no LogManager available (read-only env?); \
1633 replica I/O loop not started",
1634 self.config.node_name.as_str(),
1635 );
1636 }
1637 }
1638 // -------------------------------------------------------------------
1639
1640 // Notify listeners
1641 self.notify_listeners(old_state, NodeState::Replica);
1642
1643 log::info!(
1644 "Node '{}' became replica of master '{}'",
1645 self.config.node_name.as_str(),
1646 master_name
1647 );
1648 Ok(())
1649 }
1650
1651 /// Initiate a master transfer to the target node.
1652 ///
1653 ///
1654 ///
1655 /// Transfers the current master state from this node to one of the
1656 /// electable replicas. The replica that is actually chosen to be the new
1657 /// master is the one with which the Master Transfer can be completed most
1658 /// rapidly. The transfer operation ensures that all changes at this node
1659 /// are available at the new master upon conclusion of the operation.
1660 pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
1661 if self.is_shutdown() {
1662 return Err(RepError::StateError(
1663 "Cannot transfer master: environment is closed".to_string(),
1664 ));
1665 }
1666
1667 if !self.is_master() {
1668 return Err(RepError::InvalidState(
1669 "Master transfer can only be initiated on the master node"
1670 .to_string(),
1671 ));
1672 }
1673
1674 log::info!(
1675 "Node '{}' initiating master transfer to '{}'",
1676 self.config.node_name.as_str(),
1677 config.target_node,
1678 );
1679
1680 // Closes finding F7 of `docs/src/internal/api-audit-2026-05-rep.md`.
1681 //
1682 // Steps:
1683 // 1. Locate the target's address.
1684 // 2. Compute the new term (current observed term + 1).
1685 // 3. Send TRANSFER_MASTER to the target — it will become master.
1686 // 4. Send TRANSFER_MASTER (with the same term + new master name) to
1687 // every other peer so they re-target.
1688 // 5. Demote self to Replica of the target.
1689 //
1690 // The transfer is best-effort: a peer that doesn't ack is logged
1691 // and skipped. The election driver will reconcile any divergence
1692 // on the next election round.
1693
1694 let target_addr = self
1695 .group_service
1696 .get_all_nodes()
1697 .into_iter()
1698 .find(|n| n.name == config.target_node)
1699 .and_then(|n| {
1700 format!("{}:{}", n.host, n.port)
1701 .parse::<std::net::SocketAddr>()
1702 .ok()
1703 })
1704 .ok_or_else(|| {
1705 RepError::ConfigError(format!(
1706 "transfer_master: target '{}' not registered or has bad address",
1707 config.target_node
1708 ))
1709 })?;
1710
1711 let new_term = self.master_tracker.get_term().saturating_add(1);
1712
1713 // 1. Tell the target to become master at the new term.
1714 let target_ack = crate::group_admin::send_transfer_master(
1715 target_addr,
1716 &config.target_node,
1717 new_term,
1718 )
1719 .map_err(|e| {
1720 RepError::NetworkError(format!(
1721 "transfer_master: failed to signal target '{}': {}",
1722 config.target_node, e
1723 ))
1724 })?;
1725 if !target_ack {
1726 return Err(RepError::StateError(format!(
1727 "transfer_master: target '{}' rejected the transfer",
1728 config.target_node
1729 )));
1730 }
1731
1732 // 2. Inform all other peers (best-effort).
1733 for peer in self.group_service.get_all_nodes() {
1734 if peer.name == self.config.node_name
1735 || peer.name == config.target_node
1736 {
1737 continue;
1738 }
1739 if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
1740 let _ = crate::group_admin::send_transfer_master(
1741 addr,
1742 &config.target_node,
1743 new_term,
1744 );
1745 }
1746 }
1747
1748 // 3. Demote self to Replica of the new master.
1749 self.become_replica(&config.target_node)?;
1750
1751 log::info!(
1752 "Node '{}' transferred master to '{}' at term {}",
1753 self.config.node_name.as_str(),
1754 config.target_node,
1755 new_term,
1756 );
1757 Ok(())
1758 }
1759
1760 /// Register a VLSN (as master, after writing a log entry).
1761 ///
1762 /// Maps the given VLSN to the specified log file position. This is called
1763 /// by the master after it writes a replicated log entry.
1764 pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
1765 self.vlsn_index.register(vlsn, file_number, file_offset);
1766 }
1767
1768 /// Replicate a freshly committed log entry from the master.
1769 ///
1770 /// Closes finding F9 of `docs/src/internal/api-audit-2026-05-rep.md`.
1771 ///
1772 /// Combines `register_vlsn` with a push into the in-memory
1773 /// `peer_scanner` so that downstream replicas pulling from this
1774 /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
1775 /// stream the entry without round-tripping through the on-disk
1776 /// log. The local log is still the source of truth; the peer
1777 /// scanner is a fast-path cache that bounds itself via
1778 /// `PeerLogScanner::with_capacity` so old entries are evicted.
1779 ///
1780 /// Should be called by the master after the local commit has
1781 /// fsynced. Calling on a non-master is harmless (the peer
1782 /// scanner cache is also used by replicas) but is logged at trace
1783 /// level for diagnostics.
1784 pub fn replicate_entry(
1785 &self,
1786 vlsn: u64,
1787 file_number: u32,
1788 file_offset: u32,
1789 entry_type: u8,
1790 data: Vec<u8>,
1791 ) {
1792 self.vlsn_index.register(vlsn, file_number, file_offset);
1793 self.peer_scanner.push(vlsn, entry_type, data);
1794 if !self.is_master() {
1795 log::trace!(
1796 "replicate_entry called on non-master node '{}': vlsn={}, type={}",
1797 self.config.node_name,
1798 vlsn,
1799 entry_type,
1800 );
1801 }
1802 }
1803
1804 /// Apply a replicated entry (as replica).
1805 ///
1806 /// Applies a log entry received from the master. This is called by the
1807 /// replica stream handler after receiving an entry from the feeder.
1808 ///
1809 /// `data` is the wire-encoded log-record payload. When the
1810 /// replicated environment has not been wired to a local
1811 /// `noxu_db::Environment` (i.e., before `with_environment` is
1812 /// called) the payload is forwarded into the in-memory peer
1813 /// scanner so that downstream replicas attached to the
1814 /// `PEER_FEEDER` service can re-stream it; the local log is **not**
1815 /// updated. This is documented behaviour rather than a stub — see
1816 /// `api-audit-2026-05-rep.md` finding #26 (medium) for the
1817 /// `with_environment`-required local-apply path.
1818 /// cleanup (rep info F35: `_data` placeholder) renames the leading
1819 /// underscore so reviewers don't read it as a TODO.
1820 pub fn apply_entry(
1821 &self,
1822 vlsn: u64,
1823 entry_type: u8,
1824 data: Vec<u8>,
1825 ) -> Result<()> {
1826 if self.is_shutdown() {
1827 return Err(RepError::StateError(
1828 "Cannot apply entry: environment is closed".to_string(),
1829 ));
1830 }
1831
1832 // Register the VLSN in the index.
1833 self.vlsn_index.register(vlsn, 0, 0);
1834
1835 // Push into the peer log scanner so downstream replicas can
1836 // receive this entry via the PEER_FEEDER service.
1837 self.peer_scanner.push(vlsn, entry_type, data);
1838
1839 log::trace!(
1840 "Applied replicated entry: vlsn={}, type={}",
1841 vlsn,
1842 entry_type
1843 );
1844 Ok(())
1845 }
1846
1847 /// Record an ack from a replica (as master).
1848 ///
1849 /// Records that the specified replica has acknowledged processing up to
1850 /// the given VLSN. This is used by the master to track durability
1851 /// guarantees.
1852 pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
1853 self.ack_tracker.record_ack(vlsn, replica_name);
1854 }
1855
1856 /// Set the state change listener.
1857 ///
1858 ///
1859 ///
1860 /// Sets the listener used to receive asynchronous replication node state
1861 /// change events. Note that there is one listener per replication node,
1862 /// not one per handle. Invoking this method adds to the set of listeners.
1863 ///
1864 /// Invoking this method typically results in an immediate callback to the
1865 /// application via the `on_state_change` method, so that the application
1866 /// is made aware of the existing state of the node at the time the listener
1867 /// is first established.
1868 pub fn set_state_change_listener(
1869 &self,
1870 listener: Arc<dyn StateChangeListener>,
1871 ) {
1872 // Immediately notify the listener of the current state
1873 let current_state = self.node_state.get_state();
1874 let event = StateChangeEvent::new(
1875 current_state,
1876 current_state,
1877 self.get_master_name(),
1878 );
1879 listener.on_state_change(event);
1880
1881 let mut listeners = self.listeners.write();
1882 listeners.push(listener);
1883 }
1884
1885 /// Close the replicated environment.
1886 ///
1887 ///
1888 ///
1889 /// Closes this handle and releases any resources. When closed, daemon
1890 /// threads are stopped, even if they are performing work. The node ceases
1891 /// participation in the replication group. If the node was currently the
1892 /// master, the rest of the group will hold an election.
1893 ///
1894 /// The ReplicatedEnvironment should not be closed while any other type of
1895 /// handle that refers to it is not yet closed.
1896 pub fn close(&self) -> Result<()> {
1897 if self.shutdown.swap(true, Ordering::SeqCst) {
1898 // Already closed
1899 return Ok(());
1900 }
1901
1902 let old_state = self.node_state.get_state();
1903
1904 // Transition to Shutdown state. The state machine allows this from
1905 // any non-Shutdown state.
1906 let _ = self.node_state.transition_to(NodeState::Shutdown);
1907
1908 // Notify listeners of the shutdown
1909 self.notify_listeners(old_state, NodeState::Shutdown);
1910
1911 // Clear feeders
1912 {
1913 let mut feeders = self.feeders.write();
1914 feeders.clear();
1915 }
1916
1917 // Signal and join all I/O threads spawned by become_master /
1918 // become_replica / start_vlsn_persistence_daemon. The vlsn-flush
1919 // thread does a final flush on its way out so a clean close is
1920 // recoverable. Closes finding F11.
1921 self.io_shutdown.store(true, Ordering::SeqCst);
1922 {
1923 let mut threads = self.io_threads.lock().unwrap();
1924 for handle in threads.drain(..) {
1925 let _ = handle.join();
1926 }
1927 }
1928
1929 // Belt-and-braces: even when no daemon is running (e.g.
1930 // `ReplicatedEnvironment::new` without `open`), persist a final
1931 // snapshot if env_home is configured.
1932 if let Some(ref home) = self.config.env_home
1933 && let Err(e) =
1934 crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
1935 {
1936 log::warn!(
1937 "close: failed to persist VLSN index to {}: {}",
1938 home.display(),
1939 e
1940 );
1941 }
1942
1943 // Stop the service dispatcher (the: serviceDispatcher.shutdown()).
1944 if let Some(ref dispatcher) = self.tcp_dispatcher {
1945 dispatcher.stop();
1946 let kind = if dispatcher.is_tls() { "TLS" } else { "TCP" };
1947 log::debug!(
1948 "Node '{}' {} service dispatcher stopped",
1949 self.config.node_name.as_str(),
1950 kind,
1951 );
1952 }
1953
1954 log::info!(
1955 "Replicated environment '{}' in group '{}' closed",
1956 self.config.node_name.as_str(),
1957 self.config.group_name.as_str()
1958 );
1959
1960 Ok(())
1961 }
1962
1963 /// Close this handle and shut down the Replication Group by forcing all
1964 /// active Replicas to exit.
1965 ///
1966 ///
1967 ///
1968 /// This method must be invoked on the node that's currently the Master
1969 /// after all other outstanding handles have been closed. The Master waits
1970 /// for all active Replicas to catch up so that they have a current set of
1971 /// logs, and then shuts them down.
1972 pub fn shutdown_group(
1973 &self,
1974 replica_shutdown_timeout_ms: u64,
1975 ) -> Result<()> {
1976 if !self.is_master() {
1977 return Err(RepError::InvalidState(
1978 "shutdownGroup must be invoked on the master".to_string(),
1979 ));
1980 }
1981
1982 log::info!(
1983 "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
1984 self.config.node_name.as_str(),
1985 self.config.group_name.as_str(),
1986 replica_shutdown_timeout_ms,
1987 );
1988
1989 // Closes finding F8 of `docs/src/internal/api-audit-2026-05-rep.md`.
1990 //
1991 // Send SHUTDOWN_GROUP to every known peer. The recipient calls
1992 // its own `close()` and the per-connection ADMIN handler
1993 // returns ACK_OK. Any peer that doesn't ack within the
1994 // timeout is logged and the master proceeds. After signalling
1995 // every peer, the master closes its own env.
1996 let deadline = std::time::Instant::now()
1997 + Duration::from_millis(replica_shutdown_timeout_ms);
1998
1999 for peer in self.group_service.get_all_nodes() {
2000 if peer.name == self.config.node_name {
2001 continue;
2002 }
2003 // Don't exceed the deadline waiting for any single peer.
2004 let now = std::time::Instant::now();
2005 if now >= deadline {
2006 log::warn!(
2007 "shutdown_group: deadline reached; skipping remaining peers"
2008 );
2009 break;
2010 }
2011 let addr_str = format!("{}:{}", peer.host, peer.port);
2012 let addr = match addr_str.parse::<SocketAddr>() {
2013 Ok(a) => a,
2014 Err(e) => {
2015 log::warn!(
2016 "shutdown_group: peer '{}' has bad address {}: {}",
2017 peer.name,
2018 addr_str,
2019 e
2020 );
2021 continue;
2022 }
2023 };
2024 match crate::group_admin::send_shutdown_group(addr) {
2025 Ok(true) => log::info!(
2026 "shutdown_group: peer '{}' acknowledged",
2027 peer.name
2028 ),
2029 Ok(false) => log::warn!(
2030 "shutdown_group: peer '{}' rejected the request",
2031 peer.name
2032 ),
2033 Err(e) => log::warn!(
2034 "shutdown_group: peer '{}' unreachable: {}",
2035 peer.name,
2036 e
2037 ),
2038 }
2039 }
2040
2041 // Master closes itself last.
2042 self.close()
2043 }
2044
2045 /// Check if shutdown is in progress.
2046 pub fn is_shutdown(&self) -> bool {
2047 self.shutdown.load(Ordering::SeqCst)
2048 }
2049
2050 /// Notify all registered listeners of a state change.
2051 fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
2052 let listeners = self.listeners.read();
2053 if !listeners.is_empty() {
2054 let event = StateChangeEvent::new(
2055 old_state,
2056 new_state,
2057 self.get_master_name(),
2058 );
2059 for listener in listeners.iter() {
2060 listener.on_state_change(event.clone());
2061 }
2062 }
2063 }
2064}
2065
2066// ---------------------------------------------------------------------------
2067// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
2068// ---------------------------------------------------------------------------
2069//
2070// `noxu_db::Transaction::commit_with_durability` calls
2071// `await_replica_acks` after the local WAL fsync. This impl:
2072//
2073// 1. Rejects calls on a non-master node with `NotMaster`.
2074// 2. Rejects calls during shutdown with `Shutdown`.
2075// 3. Computes the required ack count from `electable_count` and the
2076// requested policy.
2077// 4. Allocates a unique commit sequence number, registers the ack
2078// requirement on the `AckTracker`, and polls `is_satisfied` with
2079// a small sleep until either the timeout elapses or the policy
2080// is satisfied.
2081// 5. Cleans up the tracker entry on every exit path.
2082//
2083// Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
2084impl ReplicaAckCoordinator for ReplicatedEnvironment {
2085 fn await_replica_acks(
2086 &self,
2087 policy: ReplicaAckPolicyKind,
2088 timeout: Duration,
2089 ) -> std::result::Result<u32, AckWaitError> {
2090 // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
2091 // says callers may already short-circuit, but be defensive.
2092 if matches!(policy, ReplicaAckPolicyKind::None) {
2093 return Ok(0);
2094 }
2095
2096 if self.is_shutdown() {
2097 return Err(AckWaitError {
2098 kind: AckWaitErrorKind::Shutdown,
2099 needed: 0,
2100 received: 0,
2101 });
2102 }
2103
2104 if !self.is_master() {
2105 return Err(AckWaitError {
2106 kind: AckWaitErrorKind::NotMaster,
2107 needed: 0,
2108 received: 0,
2109 });
2110 }
2111
2112 // Count electable peers (excluding the master) using the
2113 // RepGroup view, which counts Arbiters and Electables
2114 // identically. Only Electable nodes are counted as data
2115 // replicas able to ack a commit. The master itself is
2116 // *implicit*: it is not registered in `group_service` (only
2117 // peers are), so we add 1 to obtain the total electable
2118 // count expected by `ReplicaAckPolicyKind::required_acks`.
2119 let group = self.get_rep_group();
2120 let electable_peers: u32 = group
2121 .get_nodes()
2122 .iter()
2123 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2124 .count() as u32;
2125 let electable_count: u32 = electable_peers + 1; // +1 for self/master
2126
2127 let needed = policy.required_acks(electable_count);
2128 if needed == 0 {
2129 // Single-node group, or All with only the master itself.
2130 return Ok(0);
2131 }
2132
2133 let commit_seq = self
2134 .commit_ack_seq
2135 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2136 self.ack_tracker.register(commit_seq, needed);
2137
2138 // Poll-with-sleep loop. The poll interval is small enough that
2139 // late acks satisfy the policy promptly, and large enough that
2140 // a single commit waiting on a slow replica does not spin a
2141 // CPU.
2142 let poll_interval =
2143 std::cmp::min(timeout / 50, Duration::from_millis(20));
2144 let poll_interval = if poll_interval.is_zero() {
2145 Duration::from_millis(1)
2146 } else {
2147 poll_interval
2148 };
2149 let deadline = std::time::Instant::now() + timeout;
2150
2151 loop {
2152 if self.ack_tracker.is_satisfied(commit_seq) {
2153 self.ack_tracker.cleanup_through(commit_seq);
2154 return Ok(needed);
2155 }
2156 if self.is_shutdown() {
2157 self.ack_tracker.cleanup_through(commit_seq);
2158 return Err(AckWaitError {
2159 kind: AckWaitErrorKind::Shutdown,
2160 needed,
2161 received: 0,
2162 });
2163 }
2164 let now = std::time::Instant::now();
2165 if now >= deadline {
2166 // Tear down the registration so it doesn't accumulate;
2167 // record the partial ack count so the caller can report
2168 // a useful `InsufficientReplicas { required, available }`.
2169 let received =
2170 self.ack_tracker.received_count(commit_seq).unwrap_or(0);
2171 self.ack_tracker.cleanup_through(commit_seq);
2172 return Err(AckWaitError {
2173 kind: AckWaitErrorKind::Timeout,
2174 needed,
2175 received,
2176 });
2177 }
2178 let sleep_for = std::cmp::min(
2179 poll_interval,
2180 deadline.saturating_duration_since(now),
2181 );
2182 std::thread::sleep(sleep_for);
2183 }
2184 }
2185
2186 /// X-3: allocate the next VLSN for a recovered XA commit and register
2187 /// `lsn` in the VLSN index so feeders can stream the commit.
2188 ///
2189 /// Increments off the current latest VLSN so the new VLSN is strictly
2190 /// monotonically increasing. In a single-node or master-less environment
2191 /// (not master) returns 0 (NULL_VLSN — harmless, the default).
2192 fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
2193 // Only allocate a VLSN when we are the master; on a replica the
2194 // recovered XA should have been replicated by the original master.
2195 if !self.is_master() {
2196 return 0;
2197 }
2198 let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
2199 self.vlsn_index.register(
2200 next_vlsn,
2201 lsn.file_number(),
2202 lsn.file_offset(),
2203 );
2204 log::debug!(
2205 "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
2206 next_vlsn,
2207 lsn
2208 );
2209 next_vlsn
2210 }
2211
2212 /// R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.
2213 ///
2214 /// The caller writes the `TxnCommit` WAL entry with this VLSN embedded,
2215 /// then calls `register_recovered_commit_vlsn` with the actual commit LSN.
2216 /// This two-step approach ensures the WAL entry carries the VLSN so the
2217 /// X-14 VLSN rebuild on second crash can find it.
2218 fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
2219 if !self.is_master() {
2220 return 0;
2221 }
2222 // Peek at the next VLSN without registering. The actual registration
2223 // happens in register_recovered_commit_vlsn() after the WAL write.
2224 self.vlsn_index.get_latest_vlsn() + 1
2225 }
2226
2227 /// R-3: register a pre-allocated VLSN in the VLSN index with the actual
2228 /// commit LSN. Called after writing the `TxnCommit` WAL entry.
2229 fn register_recovered_commit_vlsn(
2230 &self,
2231 vlsn: u64,
2232 commit_lsn: noxu_util::Lsn,
2233 ) {
2234 if vlsn == 0 || !self.is_master() {
2235 return;
2236 }
2237 self.vlsn_index.register(
2238 vlsn,
2239 commit_lsn.file_number(),
2240 commit_lsn.file_offset(),
2241 );
2242 log::debug!(
2243 "register_recovered_commit_vlsn: registered vlsn={} for commit_lsn={:?}",
2244 vlsn,
2245 commit_lsn
2246 );
2247 }
2248}
2249
2250#[cfg(test)]
2251mod tests {
2252 use super::*;
2253 use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
2254
2255 /// Helper to create a test config with a fixed port (unit-test style,
2256 /// no real TCP bind needed — hostname "localhost" resolves but the port
2257 /// might be in use; use `test_config_port0` for real TCP tests).
2258 fn test_config(node_name: &str) -> RepConfig {
2259 RepConfig::builder("test_group", node_name, "localhost")
2260 .node_port(5001)
2261 .build()
2262 }
2263
2264 /// Helper to create a test config that binds to an OS-assigned port.
2265 fn test_config_port0(node_name: &str) -> RepConfig {
2266 RepConfig::builder("test_group", node_name, "127.0.0.1")
2267 .node_port(0)
2268 .build()
2269 }
2270
2271 #[test]
2272 fn test_initial_state_is_detached() {
2273 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2274 // NodeStateMachine starts in Detached state
2275 assert_eq!(env.get_state(), NodeState::Detached);
2276 assert!(!env.is_master());
2277 assert!(!env.is_replica());
2278 assert!(!env.is_active());
2279 }
2280
2281 #[test]
2282 fn test_become_master() {
2283 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2284 env.become_master(1).unwrap();
2285 assert_eq!(env.get_state(), NodeState::Master);
2286 assert!(env.is_master());
2287 assert!(!env.is_replica());
2288 assert!(env.is_active());
2289 }
2290
2291 #[test]
2292 fn test_become_replica() {
2293 let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
2294 env.become_replica("node1").unwrap();
2295 assert_eq!(env.get_state(), NodeState::Replica);
2296 assert!(!env.is_master());
2297 assert!(env.is_replica());
2298 assert!(env.is_active());
2299 }
2300
2301 #[test]
2302 fn test_get_node_name() {
2303 let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
2304 assert_eq!(env.get_node_name(), "my_node");
2305 }
2306
2307 #[test]
2308 fn test_get_group_name() {
2309 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2310 assert_eq!(env.get_group_name(), "test_group");
2311 }
2312
2313 #[test]
2314 fn test_register_vlsn_updates_index() {
2315 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2316 env.register_vlsn(1, 0, 100);
2317 env.register_vlsn(2, 0, 200);
2318 env.register_vlsn(3, 0, 300);
2319
2320 assert_eq!(env.get_current_vlsn(), 3);
2321 let range = env.get_vlsn_range();
2322 assert_eq!(range.first(), 1);
2323 assert_eq!(range.last(), 3);
2324 }
2325
2326 #[test]
2327 fn test_record_ack() {
2328 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
2329 env.become_master(1).unwrap();
2330
2331 env.register_vlsn(1, 0, 100);
2332 // Register a pending ack requirement, then record ack
2333 env.get_ack_tracker().register(1, 1);
2334 env.record_ack(1, "replica1");
2335 // Ack should be satisfied
2336 assert!(env.get_ack_tracker().is_satisfied(1));
2337 }
2338
2339 #[test]
2340 fn test_close_sets_shutdown() {
2341 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2342 assert!(!env.is_shutdown());
2343
2344 env.close().unwrap();
2345 assert!(env.is_shutdown());
2346 // After close, state should be Shutdown
2347 assert_eq!(env.get_state(), NodeState::Shutdown);
2348 }
2349
2350 #[test]
2351 fn test_close_is_idempotent() {
2352 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2353 env.close().unwrap();
2354 env.close().unwrap(); // Should not error
2355 assert!(env.is_shutdown());
2356 }
2357
2358 #[test]
2359 fn test_cannot_become_master_when_shutdown() {
2360 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2361 env.close().unwrap();
2362
2363 let result = env.become_master(1);
2364 assert!(result.is_err());
2365 }
2366
2367 #[test]
2368 fn test_cannot_become_replica_when_shutdown() {
2369 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2370 env.close().unwrap();
2371
2372 let result = env.become_replica("master");
2373 assert!(result.is_err());
2374 }
2375
2376 #[test]
2377 fn test_cannot_apply_entry_when_shutdown() {
2378 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2379 env.close().unwrap();
2380
2381 let result = env.apply_entry(1, 0, vec![1, 2, 3]);
2382 assert!(result.is_err());
2383 }
2384
2385 #[test]
2386 fn test_cannot_transfer_master_when_not_master() {
2387 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2388 env.become_replica("other").unwrap();
2389
2390 let config = MasterTransferConfig::new(
2391 "target_node".to_string(),
2392 Duration::from_secs(30),
2393 );
2394 let result = env.transfer_master(config);
2395 assert!(result.is_err());
2396 }
2397
2398 #[test]
2399 fn test_transfer_master_requires_registered_target() {
2400 // F7: transfer_master is no longer a no-op; it sends an ADMIN
2401 // TRANSFER_MASTER signal to the target via TCP. An unregistered
2402 // target is rejected at the address-resolution step.
2403 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2404 env.become_master(1).unwrap();
2405
2406 let config = MasterTransferConfig::new(
2407 "unknown_target".to_string(),
2408 Duration::from_secs(30),
2409 );
2410 let result = env.transfer_master(config);
2411 assert!(
2412 result.is_err(),
2413 "transfer_master to unregistered target must error"
2414 );
2415 }
2416
2417 #[test]
2418 fn test_apply_entry_registers_vlsn() {
2419 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2420 env.become_replica("master").unwrap();
2421
2422 env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
2423 env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
2424
2425 assert_eq!(env.get_current_vlsn(), 2);
2426 }
2427
2428 #[test]
2429 fn test_master_name_tracking() {
2430 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2431
2432 // Initially no master known
2433 assert!(env.get_master_name().is_none());
2434
2435 // After becoming master, this node is the master
2436 env.become_master(1).unwrap();
2437 assert_eq!(env.get_master_name(), Some("node1".to_string()));
2438 }
2439
2440 #[test]
2441 fn test_master_to_replica_transition() {
2442 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2443
2444 // Become master first
2445 env.become_master(1).unwrap();
2446 assert_eq!(env.get_master_name(), Some("node1".to_string()));
2447
2448 // Transition to replica (Master -> Replica is valid)
2449 env.become_replica("other_master").unwrap();
2450 assert_eq!(env.get_master_name(), Some("other_master".to_string()));
2451 assert!(env.is_replica());
2452 }
2453
2454 #[test]
2455 fn test_state_change_listener_notification() {
2456 struct TestListener {
2457 call_count: AtomicU32,
2458 last_new_state: noxu_sync::Mutex<Option<NodeState>>,
2459 }
2460
2461 impl StateChangeListener for TestListener {
2462 fn on_state_change(&self, event: StateChangeEvent) {
2463 self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
2464 *self.last_new_state.lock() = Some(event.new_state);
2465 }
2466 }
2467
2468 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2469 let listener = Arc::new(TestListener {
2470 call_count: AtomicU32::new(0),
2471 last_new_state: noxu_sync::Mutex::new(None),
2472 });
2473
2474 // Setting the listener should trigger an immediate notification
2475 env.set_state_change_listener(listener.clone());
2476 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
2477
2478 // State change should trigger another notification
2479 env.become_master(1).unwrap();
2480 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
2481 assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
2482 }
2483
2484 #[test]
2485 fn test_close_notifies_listeners() {
2486 struct ShutdownListener {
2487 shutdown_seen: AtomicBool,
2488 }
2489
2490 impl StateChangeListener for ShutdownListener {
2491 fn on_state_change(&self, event: StateChangeEvent) {
2492 if event.new_state == NodeState::Shutdown {
2493 self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
2494 }
2495 }
2496 }
2497
2498 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2499 let listener = Arc::new(ShutdownListener {
2500 shutdown_seen: AtomicBool::new(false),
2501 });
2502
2503 // The initial notification is for the current (Detached) state
2504 env.set_state_change_listener(listener.clone());
2505
2506 // Become master first so the close transition is meaningful
2507 env.become_master(1).unwrap();
2508 assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
2509
2510 env.close().unwrap();
2511 assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
2512 }
2513
2514 #[test]
2515 fn test_shutdown_group_requires_master() {
2516 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2517 env.become_replica("other").unwrap();
2518
2519 let result = env.shutdown_group(5000);
2520 assert!(result.is_err());
2521 }
2522
2523 #[test]
2524 fn test_shutdown_group_as_master() {
2525 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2526 env.become_master(1).unwrap();
2527
2528 let result = env.shutdown_group(5000);
2529 assert!(result.is_ok());
2530 assert!(env.is_shutdown());
2531 }
2532
2533 #[test]
2534 fn test_get_config() {
2535 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2536 assert_eq!(env.get_config().node_name, "node1");
2537 assert_eq!(env.get_config().group_name, "test_group");
2538 }
2539
2540 #[test]
2541 fn test_get_stats() {
2542 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2543 let _stats = env.get_stats();
2544 // Just verify we can access stats without panicking
2545 }
2546
2547 // -----------------------------------------------------------------------
2548 // TCP dispatcher tests (H-5 / H-7)
2549 // -----------------------------------------------------------------------
2550
2551 #[test]
2552 fn test_tcp_dispatcher_starts_on_new() {
2553 // Use port 0 so the OS assigns an ephemeral port.
2554 let env =
2555 ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
2556 // The dispatcher must have started and bound a real port.
2557 let addr = env.bound_addr();
2558 assert!(addr.is_some(), "expected a bound address");
2559 let addr = addr.unwrap();
2560 assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
2561 }
2562
2563 #[test]
2564 fn test_tcp_dispatcher_stops_on_close() {
2565 let env =
2566 ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
2567 // Dispatcher is running.
2568 assert!(
2569 env.tcp_dispatcher
2570 .as_ref()
2571 .map(|d| d.is_running())
2572 .unwrap_or(false)
2573 );
2574
2575 env.close().unwrap();
2576
2577 // After close, dispatcher must be stopped.
2578 assert!(
2579 !env.tcp_dispatcher
2580 .as_ref()
2581 .map(|d| d.is_running())
2582 .unwrap_or(false),
2583 "dispatcher should be stopped after close"
2584 );
2585 }
2586
2587 #[test]
2588 fn test_tcp_dispatcher_accepts_connection() {
2589 use crate::net::Channel;
2590 use crate::net::ServiceHandler;
2591 use crate::net::service_dispatcher::connect_to_service;
2592 use std::sync::atomic::{AtomicU32, Ordering as AO};
2593 use std::time::Duration;
2594
2595 struct PingHandler {
2596 count: AtomicU32,
2597 }
2598 impl ServiceHandler for PingHandler {
2599 fn service_name(&self) -> &str {
2600 "ping"
2601 }
2602 fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
2603 self.count.fetch_add(1, AO::SeqCst);
2604 // Echo the first message back.
2605 if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
2606 let _ = ch.send(&msg);
2607 }
2608 Ok(())
2609 }
2610 }
2611
2612 let env =
2613 ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
2614 let addr = env.bound_addr().expect("dispatcher must be bound");
2615
2616 // Register a ping handler on the running dispatcher.
2617 if let Some(ref disp) = env.tcp_dispatcher {
2618 let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
2619 disp.register("ping", handler.clone());
2620
2621 // Give the accept thread a moment.
2622 std::thread::sleep(Duration::from_millis(20));
2623
2624 let client = connect_to_service(addr, "ping").unwrap();
2625 client.send(b"hello").unwrap();
2626 let reply = client.receive(Duration::from_secs(2)).unwrap();
2627 assert_eq!(reply, Some(b"hello".to_vec()));
2628
2629 assert_eq!(handler.count.load(AO::SeqCst), 1);
2630 }
2631
2632 env.close().unwrap();
2633 }
2634
2635 #[test]
2636 fn test_become_master_auto_transitions_from_detached() {
2637 // The state machine requires Detached -> Unknown -> Master.
2638 // become_master() should handle this automatically.
2639 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2640 assert_eq!(env.get_state(), NodeState::Detached);
2641 env.become_master(1).unwrap();
2642 assert_eq!(env.get_state(), NodeState::Master);
2643 }
2644
2645 #[test]
2646 fn test_become_replica_auto_transitions_from_detached() {
2647 // The state machine requires Detached -> Unknown -> Replica.
2648 // become_replica() should handle this automatically.
2649 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2650 assert_eq!(env.get_state(), NodeState::Detached);
2651 env.become_replica("master_node").unwrap();
2652 assert_eq!(env.get_state(), NodeState::Replica);
2653 }
2654
2655 #[test]
2656 fn test_cannot_transfer_master_when_shutdown() {
2657 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2658 env.become_master(1).unwrap();
2659 env.close().unwrap();
2660
2661 let config = MasterTransferConfig::new(
2662 "target".to_string(),
2663 Duration::from_secs(30),
2664 );
2665 let result = env.transfer_master(config);
2666 assert!(result.is_err());
2667 }
2668
2669 #[test]
2670 fn test_full_lifecycle() {
2671 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2672
2673 // Start as detached
2674 assert_eq!(env.get_state(), NodeState::Detached);
2675
2676 // Become master
2677 env.become_master(1).unwrap();
2678 assert!(env.is_master());
2679
2680 // Register some VLSNs
2681 env.register_vlsn(1, 0, 100);
2682 env.register_vlsn(2, 0, 200);
2683
2684 // Record ack from replica
2685 env.record_ack(1, "replica1");
2686 env.record_ack(2, "replica1");
2687
2688 // Transition to replica (simulating failover)
2689 env.become_replica("node2").unwrap();
2690 assert!(env.is_replica());
2691
2692 // Apply entries from new master
2693 env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
2694
2695 // Close
2696 env.close().unwrap();
2697 assert!(env.is_shutdown());
2698 }
2699
2700 /// Verify that `with_environment` lazily registers the RESTORE service on
2701 /// the TCP dispatcher when `config.env_home` was not set at construction.
2702 ///
2703 /// This mirrors`RepNode.envSetup()` which registers the restore handler
2704 /// when the environment is wired into the replicated node.
2705 #[test]
2706 fn test_restore_registered_lazily_via_with_environment() {
2707 use noxu_dbi::EnvironmentImpl;
2708 use tempfile::TempDir;
2709
2710 let dir = TempDir::new().expect("temp dir");
2711
2712 // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
2713 let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
2714 .node_port(0)
2715 .build();
2716
2717 let rep_env = ReplicatedEnvironment::new(config).unwrap();
2718
2719 // Not yet registered.
2720 assert!(
2721 !rep_env
2722 .restore_registered
2723 .load(std::sync::atomic::Ordering::SeqCst)
2724 );
2725
2726 // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
2727 let env_impl = Arc::new(
2728 EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
2729 );
2730 rep_env.with_environment(env_impl);
2731
2732 // Now the RESTORE service must be registered.
2733 assert!(
2734 rep_env
2735 .restore_registered
2736 .load(std::sync::atomic::Ordering::SeqCst)
2737 );
2738 }
2739
2740 /// Verify that when `config.env_home` IS set at construction, the RESTORE
2741 /// service is registered immediately (not deferred).
2742 #[test]
2743 fn test_restore_registered_eagerly_when_env_home_in_config() {
2744 use tempfile::TempDir;
2745
2746 let dir = TempDir::new().expect("temp dir");
2747
2748 let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
2749 .node_port(0)
2750 .env_home(dir.path())
2751 .build();
2752
2753 let rep_env = ReplicatedEnvironment::new(config).unwrap();
2754
2755 // Should be registered immediately (env_home was in config).
2756 assert!(
2757 rep_env
2758 .restore_registered
2759 .load(std::sync::atomic::Ordering::SeqCst)
2760 );
2761 }
2762}