noxu_rep/replicated_environment.rs
1//! The main replicated environment API.
2//!
3//!
4//! A replicated database environment that is a node in a replication group.
5//! This is the entry point for replication. It wraps a standard Environment
6//! and adds replication capabilities including master election, replica
7//! streaming, and commit acknowledgments.
8//!
9//! # Replication node states
10//!
11//! The replication node state determines the operations that the application
12//! can perform against its replicated environment. The state transitions
13//! visible to the application can be summarized by the regular expression:
14//!
15//! ```text
16//! [ MASTER | REPLICA | UNKNOWN ]+ DETACHED
17//! ```
18//!
19//! When the first handle to a `ReplicatedEnvironment` is created and the node
20//! is brought up, the node usually establishes Master or Replica state. These
21//! states are preceded by the Unknown state. As various remote nodes become
22//! unavailable and elections are held, the local node may change between
23//! Master and Replica states, always with a (usually brief) transition through
24//! Unknown state.
25//!
26//! When the environment is closed, the node transitions to the Detached state.
27
28use noxu_dbi::{
29 AckWaitError, AckWaitErrorKind, EnvironmentImpl, ReplicaAckCoordinator,
30 ReplicaAckPolicyKind,
31};
32use noxu_sync::RwLock;
33use std::net::SocketAddr;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::OnceLock;
37use std::sync::Weak;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use crate::ack_tracker::AckTracker;
42use crate::elections::election_service::{
43 ELECTION_SERVICE_NAME, ElectionAcceptorState, ElectionService,
44};
45use crate::elections::master_tracker::MasterTracker;
46use crate::error::{RepError, Result};
47use crate::group_service::GroupService;
48use crate::master_transfer::MasterTransferConfig;
49use crate::net::service_dispatcher::TcpServiceDispatcher;
50use crate::network_restore::{NetworkRestore, NetworkRestoreConfig};
51use crate::network_restore_server::{
52 NetworkRestoreServer, RESTORE_SERVICE_NAME,
53};
54use crate::node_state::{NodeState, NodeStateMachine};
55use crate::rep_config::RepConfig;
56use crate::rep_stats::RepStats;
57use crate::state_change_listener::{StateChangeEvent, StateChangeListener};
58use crate::stream::feeder::Feeder;
59use crate::stream::peer_feeder::{
60 PEER_FEEDER_SERVICE_NAME, PeerFeederService, PeerLogScanner,
61};
62use crate::stream::replica_stream::{EnvironmentLogWriter, ReplicaStream};
63use crate::vlsn::vlsn_index::VlsnIndex;
64use crate::vlsn::vlsn_range::VlsnRange;
65
66/// Default heartbeat timeout for master liveness detection.
67const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
68
69/// A replicated database environment.
70///
71///
72///
73/// This is the entry point for replication. It wraps a standard Environment
74/// and adds replication capabilities including master election, replica
75/// streaming, and commit acknowledgments.
76///
77/// High Availability (HA) provides a replicated, embedded database
78/// management system which provides fast, reliable, and scalable data
79/// management. HA enables replication of an environment across a Replication
80/// Group. A `ReplicatedEnvironment` is a single node in the replication group.
81///
82/// `ReplicatedEnvironment` wraps a standard `Environment`. All database
83/// operations are executed in the same fashion in both replicated and
84/// non-replicated applications. A `ReplicatedEnvironment` must be
85/// transactional. All replicated databases created in the replicated
86/// environment must be transactional as well.
87///
88/// A `ReplicatedEnvironment` joins its replication group when it is created.
89/// When `new()` returns, the node will have established contact with the other
90/// members of the group and will be ready to service operations.
91///
92/// Replicated environments can be created with node type Electable or
93/// Secondary. Electable nodes can be masters or replicas, and participate in
94/// both master elections and commit durability decisions. Secondary nodes can
95/// only be replicas, not masters, and do not participate in either elections or
96/// durability decisions.
97///
98/// # Example
99///
100/// ```ignore
101/// use noxu_rep::{ReplicatedEnvironment, RepConfig};
102///
103/// let config = RepConfig::builder("my_group", "node1", "localhost")
104/// .node_port(5001)
105/// .build();
106/// let rep_env = ReplicatedEnvironment::new(config).unwrap();
107/// ```
108pub struct ReplicatedEnvironment {
109 /// The replication configuration for this node.
110 config: RepConfig,
111 /// Tracks the current node state (Detached, Unknown, Master, Replica).
112 node_state: NodeStateMachine,
113 /// Service for managing the replication group membership.
114 group_service: GroupService,
115 /// Maps VLSNs to log file positions.
116 ///
117 /// Wrapped in `Arc` so that background daemons (election driver,
118 /// VLSN-index persistence flusher) can share access without
119 /// borrowing the env. Closes finding F11 (
120 /// `docs/src/internal/api-audit-2026-05-rep.md`).
121 vlsn_index: Arc<VlsnIndex>,
122 /// Tracks acknowledgments from replicas (used by master).
123 ack_tracker: AckTracker,
124 /// Replication statistics.
125 stats: RepStats,
126 /// Active feeder threads (master -> replica streams).
127 feeders: RwLock<Vec<Feeder>>,
128 /// Replica stream for receiving updates from the master.
129 replica_stream: ReplicaStream,
130 /// Tracks the current master node.
131 master_tracker: MasterTracker,
132 /// State change listeners.
133 listeners: RwLock<Vec<Arc<dyn StateChangeListener>>>,
134 /// Shutdown flag.
135 shutdown: AtomicBool,
136 /// TCP service dispatcher — listens on the replication port and routes
137 /// incoming connections to the appropriate service handler (feeder, etc.).
138 ///
139 /// Started in `new()` when a listen
140 /// address is available. `None` only when the bind address cannot be
141 /// resolved (e.g. in unit tests that use port 0 but want lazy init).
142 tcp_dispatcher: Option<TcpServiceDispatcher>,
143 /// The address the `tcp_dispatcher` is actually bound to (may differ from
144 /// the configured port when port 0 is used in tests).
145 bound_addr: Option<SocketAddr>,
146
147 /// Optional live `EnvironmentImpl` wired in via [`with_environment`].
148 ///
149 /// When set, `become_master` spawns a `FeederRunner` per replica using
150 /// `EnvironmentLogScanner`, and `become_replica` spawns a
151 /// `ReplicaReceiver` thread using `EnvironmentLogWriter`.
152 ///
153 /// In HA.
154 env_impl: StdMutex<Option<Arc<EnvironmentImpl>>>,
155
156 /// Background I/O thread handles spawned during state transitions.
157 ///
158 /// Stored so that `close()` can join them cleanly. Each handle is
159 /// `Option` so we can `take()` it in `close()`.
160 io_threads: StdMutex<Vec<std::thread::JoinHandle<()>>>,
161
162 /// Shutdown flag shared with I/O threads so they terminate when the
163 /// environment is closed.
164 io_shutdown: AtomicBool,
165
166 /// Whether the RESTORE service has been registered on the TCP dispatcher.
167 ///
168 /// When `config.env_home` is `None` at construction time, registration is
169 /// deferred until `with_environment()` provides the env home path.
170 restore_registered: AtomicBool,
171
172 /// In-memory log queue used by the peer feeder service.
173 ///
174 /// When this node is a replica, `apply_entry()` pushes each received log
175 /// entry here. The `PeerFeederService` registered on the TCP dispatcher
176 /// reads from this queue to stream entries to downstream replicas that
177 /// are behind this node (peer-to-peer log distribution, HA style).
178 peer_scanner: Arc<PeerLogScanner>,
179
180 /// Monotonic sequence used by `await_replica_acks` to assign unique
181 /// keys to in-flight commits awaiting replica acknowledgment. In
182 /// production this should track the real master VLSN; until F11
183 /// closes the VLSN<->commit linkage, the coordinator uses a
184 /// synthetic sequence so that ack tracking is unique per commit.
185 /// See finding F1 in `docs/src/internal/api-audit-2026-05-rep.md`.
186 commit_ack_seq: std::sync::atomic::AtomicU64,
187
188 /// Shared acceptor state used by the ELECTION service handler.
189 /// The election driver updates `own_vlsn` / `own_term` here as the
190 /// node progresses; incoming acceptor sessions read it on every
191 /// connection so their replies always reflect the local node's
192 /// most recent state. Closes finding F6.
193 election_state: Arc<ElectionAcceptorState>,
194
195 /// Self-referential `Weak` populated once the env has been wrapped
196 /// in an `Arc`. Used by the replica I/O thread spawned in
197 /// `become_replica` so it can call `bootstrap_via_dispatcher` when
198 /// the master signals `NeedsRestore`.
199 ///
200 /// Populated lazily via [`Self::init_self_weak`] from `open()` and
201 /// the test harness. When unset (callers that build the env via
202 /// raw `Arc::new(Self::new(...))` and never call `init_self_weak`)
203 /// the I/O thread falls back to operator-driven bootstrap.
204 self_weak: OnceLock<Weak<Self>>,
205}
206
207impl ReplicatedEnvironment {
208 /// Create a new replicated environment.
209 ///
210 ///
211 ///
212 /// Creates a replicated environment handle and starts participating in the
213 /// replication group. The node's state is determined when it joins the
214 /// group, and mastership is not preconfigured. If the group has no current
215 /// master, creation will trigger an election to determine whether this node
216 /// will participate as a Master or a Replica.
217 ///
218 /// A brand new node will always join an existing group as a Replica, unless
219 /// it is the very first electable node that is creating the group. In that
220 /// case it joins as the Master of the newly formed singleton group.
221 pub fn new(config: RepConfig) -> Result<Self> {
222 let node_state = NodeStateMachine::new();
223 let group_service = GroupService::new(config.group_name.clone());
224 let vlsn_index = {
225 // F11: try to load a previously persisted vlsn.idx from
226 // env_home if one exists. A successfully loaded index lets a
227 // restarted replica resume from where it left off without a
228 // full network restore; a missing or corrupt file falls back
229 // to a fresh in-memory index (caller will need to bootstrap).
230 if let Some(ref home) = config.env_home {
231 match crate::vlsn::persist::load_from_disk(home) {
232 Ok(Some(idx)) => {
233 log::info!(
234 "Node '{}' loaded persisted VLSN index from {} \
235 ({} entries, latest vlsn={})",
236 config.node_name,
237 home.display(),
238 idx.snapshot_entries().len(),
239 idx.get_latest_vlsn(),
240 );
241 Arc::new(idx)
242 }
243 Ok(None) => Arc::new(VlsnIndex::new(10)),
244 Err(e) => {
245 log::warn!(
246 "Node '{}' failed to load persisted VLSN index \
247 from {}: {} (treating as fresh node — network \
248 restore required)",
249 config.node_name,
250 home.display(),
251 e
252 );
253 // Best-effort: remove the corrupt file so the
254 // next persist cycle writes a clean one. A
255 // missing file is the "fresh node" baseline.
256 let _ = std::fs::remove_file(
257 crate::vlsn::persist::index_path(home),
258 );
259 Arc::new(VlsnIndex::new(10))
260 }
261 }
262 } else {
263 Arc::new(VlsnIndex::new(10))
264 }
265 };
266 let ack_tracker = AckTracker::new();
267 let stats = RepStats::new();
268 let feeders = RwLock::new(Vec::new());
269 let replica_stream = ReplicaStream::new();
270 let master_tracker = MasterTracker::new(DEFAULT_HEARTBEAT_TIMEOUT);
271
272 // Start the TCP service dispatcher.
273 //
274 // equivalent: `RepImpl.open()` calls `serviceDispatcher.start()`
275 // which binds a ServerSocketChannel on the configured port and begins
276 // accepting connections. We do the same here using the node_host and
277 // node_port from RepConfig.
278 let listen_addr_str =
279 format!("{}:{}", config.node_host, config.node_port);
280 let mut restore_registered_init = false;
281
282 let (tcp_dispatcher, bound_addr) = match listen_addr_str
283 .parse::<SocketAddr>()
284 {
285 Ok(addr) => {
286 match TcpServiceDispatcher::new(addr) {
287 Ok(dispatcher) => match dispatcher.start() {
288 Ok(bound) => {
289 // Register the network restore handler so any
290 // node in the group can request a full file-set
291 // copy from this node's environment.
292 if let Some(ref home) = config.env_home {
293 let restore_server =
294 NetworkRestoreServer::new(home.clone());
295 dispatcher.register(
296 RESTORE_SERVICE_NAME,
297 Arc::new(restore_server),
298 );
299 log::debug!(
300 "Node '{}' RESTORE service registered \
301 (env_home={})",
302 config.node_name,
303 home.display(),
304 );
305 restore_registered_init = true;
306 }
307 log::info!(
308 "Node '{}' TCP service dispatcher started on {}",
309 config.node_name,
310 bound
311 );
312 (Some(dispatcher), Some(bound))
313 }
314 Err(e) => {
315 log::warn!(
316 "Node '{}' failed to start TCP dispatcher on {}: {}",
317 config.node_name,
318 listen_addr_str,
319 e
320 );
321 (None, None)
322 }
323 },
324 Err(e) => {
325 log::warn!(
326 "Node '{}' failed to create TCP dispatcher: {}",
327 config.node_name,
328 e
329 );
330 (None, None)
331 }
332 }
333 }
334 Err(e) => {
335 log::warn!(
336 "Node '{}' cannot parse listen address '{}': {}",
337 config.node_name,
338 listen_addr_str,
339 e
340 );
341 (None, None)
342 }
343 };
344
345 // Build the in-memory peer log scanner; register the peer feeder
346 // service on the dispatcher so downstream replicas can connect.
347 let peer_scanner = Arc::new(PeerLogScanner::new());
348 // F5/F31: build the acceptor state with persistence enabled when
349 // env_home is configured. Crash-durable promises are required
350 // for the Paxos safety invariant after a process restart.
351 let election_state =
352 Arc::new(if let Some(ref home) = config.env_home {
353 ElectionAcceptorState::with_env_home(
354 config.node_name.clone(),
355 1,
356 home,
357 )
358 } else {
359 ElectionAcceptorState::new(config.node_name.clone(), 1)
360 });
361 if let Some(ref dispatcher) = tcp_dispatcher {
362 let service = PeerFeederService::new(Arc::clone(&peer_scanner));
363 dispatcher.register(PEER_FEEDER_SERVICE_NAME, Arc::new(service));
364 log::debug!(
365 "Node '{}' PEER_FEEDER service registered",
366 config.node_name,
367 );
368 // F6: register the ELECTION service so peers can run
369 // run_acceptor against this node when proposing.
370 let election_svc =
371 Arc::new(ElectionService::new(Arc::clone(&election_state)));
372 dispatcher.register(ELECTION_SERVICE_NAME, election_svc);
373 log::debug!(
374 "Node '{}' ELECTION service registered",
375 config.node_name,
376 );
377 }
378
379 let env = Self {
380 config,
381 node_state,
382 group_service,
383 vlsn_index,
384 ack_tracker,
385 stats,
386 feeders,
387 replica_stream,
388 master_tracker,
389 listeners: RwLock::new(Vec::new()),
390 shutdown: AtomicBool::new(false),
391 tcp_dispatcher,
392 bound_addr,
393 env_impl: StdMutex::new(None),
394 io_threads: StdMutex::new(Vec::new()),
395 io_shutdown: AtomicBool::new(false),
396 restore_registered: AtomicBool::new(restore_registered_init),
397 peer_scanner,
398 commit_ack_seq: std::sync::atomic::AtomicU64::new(1),
399 election_state,
400 self_weak: OnceLock::new(),
401 };
402
403 Ok(env)
404 }
405
406 /// Open a replicated environment with the standard production
407 /// lifecycle.
408 ///
409 /// This is the entry point recommended by the mdBook chapters:
410 /// it allocates the `ReplicatedEnvironment`, registers all
411 /// services on the TCP dispatcher, and spawns the **election
412 /// driver** background thread that runs Paxos rounds against
413 /// known peers until the node has resolved into either Master or
414 /// Replica state.
415 ///
416 /// Closes finding F6 of `docs/src/internal/api-audit-2026-05-rep.md`.
417 ///
418 /// Use [`ReplicatedEnvironment::new`] directly only when the
419 /// caller plans to drive state transitions explicitly (test
420 /// harnesses, scripted bootstrap, recovery tooling).
421 pub fn open(config: RepConfig) -> Result<Arc<Self>> {
422 let env = Arc::new(Self::new(config)?);
423 env.init_self_weak();
424 env.start_election_driver();
425 env.start_vlsn_persistence_daemon();
426 env.register_admin_service();
427 Ok(env)
428 }
429
430 /// Populate the env's self-referential `Weak` so background
431 /// threads can obtain a back-reference for auto-orchestrated
432 /// follow-up actions (e.g. replica auto-bootstrap on
433 /// `NeedsRestore`). Idempotent: subsequent calls are silent
434 /// no-ops because the inner [`OnceLock`] only accepts one set.
435 ///
436 /// Callers that wrap the env in `Arc` and want auto-bootstrap
437 /// behaviour should call this immediately after construction.
438 /// `Self::open` already does so. Test harnesses that drive
439 /// transitions manually (`RepTestBase`) also call this so the
440 /// auto-bootstrap path is exercised in tests.
441 pub fn init_self_weak(self: &Arc<Self>) {
442 let _ = self.self_weak.set(Arc::downgrade(self));
443 }
444
445 /// Register the `ADMIN` service handler on the TCP dispatcher.
446 ///
447 /// Closes findings F7 / F8. Holds a `Weak<Self>` so the handler
448 /// does not extend the env's lifetime. Idempotent: re-registering
449 /// is harmless because `TcpServiceDispatcher::register` overwrites
450 /// the existing handler.
451 pub fn register_admin_service(self: &Arc<Self>) {
452 if let Some(ref dispatcher) = self.tcp_dispatcher {
453 crate::group_admin::register_admin_service(
454 dispatcher,
455 Arc::downgrade(self),
456 );
457 log::debug!(
458 "Node '{}' ADMIN service registered",
459 self.config.node_name,
460 );
461 }
462 }
463
464 /// Spawn the VLSN-index persistence daemon (F11).
465 ///
466 /// Periodically (every 2 seconds) snapshots the in-memory
467 /// `VlsnIndex` to `<env_home>/vlsn.idx` so a clean restart can
468 /// resume from where the replica left off without a full network
469 /// restore. No-op when `config.env_home` is `None`.
470 ///
471 /// Idempotent: only one daemon is ever spawned per env.
472 pub fn start_vlsn_persistence_daemon(self: &Arc<Self>) {
473 let Some(home) = self.config.env_home.clone() else {
474 return;
475 };
476 {
477 let threads = self.io_threads.lock().unwrap();
478 if threads.iter().any(|h| {
479 h.thread()
480 .name()
481 .is_some_and(|n| n.starts_with("noxu-vlsn-flush-"))
482 }) {
483 return;
484 }
485 }
486
487 let vlsn_index = Arc::clone(&self.vlsn_index);
488 let name = format!("noxu-vlsn-flush-{}", self.config.node_name);
489 let me = Arc::clone(self);
490 let interval = Duration::from_secs(2);
491
492 let handle = std::thread::Builder::new()
493 .name(name)
494 .spawn(move || {
495 use std::sync::atomic::Ordering;
496 let mut last_persisted_vlsn: u64 = 0;
497 while !me.io_shutdown.load(Ordering::SeqCst)
498 && !me.is_shutdown()
499 {
500 std::thread::sleep(interval);
501 if me.io_shutdown.load(Ordering::SeqCst) {
502 break;
503 }
504 let latest = vlsn_index.get_latest_vlsn();
505 if latest == last_persisted_vlsn {
506 // Nothing new to flush.
507 continue;
508 }
509 // X-2: cap the flush at the last durable checkpoint's
510 // end LSN so the persisted VLSN index never claims
511 // VLSNs beyond the durable B-tree state. After a crash
512 // the recovered tree and the index will be coherent.
513 let cap_lsn = me
514 .env_impl
515 .lock()
516 .unwrap()
517 .as_ref()
518 .and_then(|e| e.get_checkpointer())
519 .map(|c| c.get_last_checkpoint_end())
520 .unwrap_or(noxu_util::NULL_LSN);
521 match crate::vlsn::persist::flush_to_disk_capped(
522 &vlsn_index,
523 &home,
524 cap_lsn,
525 ) {
526 Ok(n) => {
527 log::trace!(
528 "vlsn-flush: persisted {} entries (latest vlsn={}, cap_lsn={:?})",
529 n,
530 latest,
531 cap_lsn,
532 );
533 last_persisted_vlsn = latest;
534 }
535 Err(e) => {
536 log::warn!(
537 "vlsn-flush: failed to persist VLSN index to {}: {}",
538 home.display(),
539 e
540 );
541 }
542 }
543 }
544 // Final flush on shutdown so a clean close is recoverable.
545 // Cap at the last checkpoint even for the shutdown flush.
546 let cap_lsn = me
547 .env_impl
548 .lock()
549 .unwrap()
550 .as_ref()
551 .and_then(|e| e.get_checkpointer())
552 .map(|c| c.get_last_checkpoint_end())
553 .unwrap_or(noxu_util::NULL_LSN);
554 if let Err(e) = crate::vlsn::persist::flush_to_disk_capped(
555 &vlsn_index,
556 &home,
557 cap_lsn,
558 ) {
559 log::warn!(
560 "vlsn-flush (final): failed to persist VLSN index: {}",
561 e
562 );
563 }
564 })
565 .expect("failed to spawn noxu-vlsn-flush thread");
566
567 self.io_threads.lock().unwrap().push(handle);
568 log::debug!(
569 "Node '{}' VLSN persistence daemon started",
570 self.config.node_name,
571 );
572 }
573
574 /// Spawn the election driver background thread.
575 ///
576 /// While the env is in `Detached` or `Unknown` state and no master
577 /// is known, the driver periodically attempts a Paxos election
578 /// against peers in `GroupService` (whose ELECTION services were
579 /// registered at `open()` time). On success the driver calls
580 /// `become_master` (if this node is the winner) or `become_replica`
581 /// (otherwise). On failure (no quorum), the driver waits
582 /// `config.election_timeout` and tries again.
583 ///
584 /// The driver respects `io_shutdown`; on env close the loop exits
585 /// promptly.
586 ///
587 /// Idempotent: a second call is a no-op (only one driver thread is
588 /// ever spawned per env).
589 pub fn start_election_driver(self: &Arc<Self>) {
590 use std::sync::atomic::Ordering;
591 // Reuse io_shutdown for cancellation; a successful spawn is
592 // recorded by appending to io_threads, so a duplicate call
593 // would re-add a thread — we use a one-shot `AtomicBool`
594 // sentinel placed in the io_shutdown's slot via a new field.
595 // Cheaper: a static name check on io_threads is impossible;
596 // instead, gate spawning on whether any io_thread already
597 // carries the driver name.
598 {
599 let threads = self.io_threads.lock().unwrap();
600 if threads.iter().any(|h| {
601 h.thread()
602 .name()
603 .is_some_and(|n| n.starts_with("noxu-election-"))
604 }) {
605 return;
606 }
607 }
608
609 let me = Arc::clone(self);
610 let name = format!("noxu-election-{}", self.config.node_name);
611 let handle = std::thread::Builder::new()
612 .name(name)
613 .spawn(move || {
614 me.run_election_loop();
615 })
616 .expect("failed to spawn election driver thread");
617 self.io_threads.lock().unwrap().push(handle);
618 log::debug!("Node '{}' election driver started", self.config.node_name,);
619 // Keep ordering sane on the io_shutdown flag.
620 let _ = self.io_shutdown.load(Ordering::SeqCst);
621 }
622
623 /// Body of the election driver loop. Public only for tests; called
624 /// by [`Self::start_election_driver`].
625 fn run_election_loop(self: Arc<Self>) {
626 use std::sync::atomic::Ordering;
627 // Maintain an internal monotonically increasing election term.
628 // Each successful or failed round bumps the term so retries do
629 // not collide with stale acceptor promises.
630 let mut term: u64 = 1;
631
632 loop {
633 if self.io_shutdown.load(Ordering::SeqCst) {
634 return;
635 }
636 if self.is_shutdown() {
637 return;
638 }
639
640 let state = self.node_state.get_state();
641 // Stop driving once we've resolved into Master/Replica;
642 // re-arm only if the node returns to Unknown.
643 if matches!(state, NodeState::Master | NodeState::Replica) {
644 std::thread::sleep(Duration::from_millis(200));
645 continue;
646 }
647 if matches!(state, NodeState::Shutdown) {
648 return;
649 }
650
651 // Probe peers for an active master via the existing
652 // GroupService cache. In the absence of a heartbeat path
653 // we rely on master_tracker (set by become_replica from
654 // the receive loop).
655 if let Some(master_name) = self.master_tracker.get_master()
656 && master_name != self.config.node_name
657 {
658 let _ = self.become_replica(&master_name);
659 continue;
660 }
661
662 // Snapshot peers to dial for ELECTION.
663 let peers: Vec<(String, SocketAddr)> = self
664 .group_service
665 .get_all_nodes()
666 .into_iter()
667 .filter(|n| n.name != self.config.node_name)
668 .filter_map(|n| {
669 format!("{}:{}", n.host, n.port)
670 .parse::<SocketAddr>()
671 .ok()
672 .map(|a| (n.name, a))
673 })
674 .collect();
675
676 // Build the local rep group view used by run_election to
677 // compute quorum and resolve the winner name. Include
678 // self.
679 let group = self.local_rep_group_with_self();
680
681 // Update election state for any concurrent acceptor calls.
682 let our_vlsn = self.vlsn_index.get_latest_vlsn();
683 self.election_state.set_vlsn(our_vlsn);
684 self.election_state.set_term(term);
685
686 // Connect to each peer's ELECTION service. Failures are
687 // tolerated: a peer that doesn't answer simply contributes
688 // no vote. The election may still reach quorum in the
689 // remaining peers.
690 let mut channels: Vec<Arc<dyn crate::net::channel::Channel>> =
691 Vec::new();
692 for (peer_name, addr) in &peers {
693 match crate::net::service_dispatcher::connect_to_service(
694 *addr,
695 ELECTION_SERVICE_NAME,
696 ) {
697 Ok(ch) => {
698 let arc: Arc<dyn crate::net::channel::Channel> =
699 Arc::new(ch);
700 channels.push(arc);
701 }
702 Err(e) => {
703 log::trace!(
704 "election driver: peer {} ({}) unreachable: {}",
705 peer_name,
706 addr,
707 e
708 );
709 }
710 }
711 }
712
713 // Resolve our own node_id from the group; if not present
714 // we cannot run an election (closed-world guard — see F22).
715 let self_node_id =
716 group.get_node(&self.config.node_name).map(|n| n.node_id());
717 let self_node_id = match self_node_id {
718 Some(id) => id,
719 None => {
720 log::warn!(
721 "election driver: node '{}' not registered in \
722 own group view; sleeping",
723 self.config.node_name
724 );
725 std::thread::sleep(Duration::from_millis(200));
726 continue;
727 }
728 };
729
730 log::debug!(
731 "election driver on '{}': starting term={} with {} peers",
732 self.config.node_name,
733 term,
734 channels.len(),
735 );
736 let outcome = crate::elections::paxos::run_election(
737 self_node_id,
738 &self.config.node_name,
739 &group,
740 &channels,
741 our_vlsn,
742 /* priority */ 1,
743 term,
744 );
745
746 match outcome {
747 Some(winner_id) if winner_id == self_node_id => {
748 if let Err(e) = self.become_master(term) {
749 log::warn!(
750 "election driver: become_master failed: {}",
751 e
752 );
753 } else {
754 log::info!(
755 "election driver: '{}' became master at term {}",
756 self.config.node_name,
757 term,
758 );
759 }
760 }
761 Some(winner_id) => {
762 if let Some(winner_node) = group
763 .get_nodes()
764 .into_iter()
765 .find(|n| n.node_id() == winner_id)
766 {
767 if let Err(e) = self.become_replica(&winner_node.name) {
768 log::warn!(
769 "election driver: become_replica failed: {}",
770 e
771 );
772 } else {
773 log::info!(
774 "election driver: '{}' became replica of '{}' at term {}",
775 self.config.node_name,
776 winner_node.name,
777 term,
778 );
779 }
780 }
781 }
782 None => {
783 log::debug!(
784 "election driver on '{}' term={}: no quorum",
785 self.config.node_name,
786 term,
787 );
788 }
789 }
790
791 term = term.saturating_add(1);
792 // Back off so we don't pin the loop on transient failures.
793 std::thread::sleep(
794 self.config.election_timeout.min(Duration::from_millis(500)),
795 );
796 }
797 }
798
799 /// Internal: a `RepGroup` snapshot that includes self.
800 fn local_rep_group_with_self(&self) -> crate::rep_group::RepGroup {
801 let mut group = self.get_rep_group();
802 // Ensure self is present in the group view; the
803 // group_service does not auto-register the local node.
804 if group.get_node(&self.config.node_name).is_none() {
805 let mut self_node = crate::rep_node::RepNode::new(
806 self.config.node_name.clone(),
807 self.config.node_type,
808 self.config.node_host.clone(),
809 self.config.node_port,
810 /* node_id */ 0,
811 );
812 // Stable self node_id derived from the name hash so
813 // re-creations in the same process don't collide.
814 use std::hash::{Hash, Hasher};
815 let mut hasher = std::collections::hash_map::DefaultHasher::new();
816 self.config.node_name.hash(&mut hasher);
817 // Restrict to a u32 range and avoid 0 (reserved for
818 // "unknown").
819 let id = ((hasher.finish() as u32) | 1).max(1);
820 self_node.node_id = id;
821 group.add_node(self_node);
822 }
823 group
824 }
825
826 /// Return the socket address the TCP service dispatcher is bound to.
827 ///
828 /// This may differ from the configured `node_port` when port 0 is used
829 /// (the OS assigns an ephemeral port). Returns `None` if the dispatcher
830 /// could not be started (e.g. the address is not resolvable).
831 pub fn bound_addr(&self) -> Option<SocketAddr> {
832 self.bound_addr
833 }
834
835 /// Wire a live `EnvironmentImpl` into this replicated environment.
836 ///
837 /// After this call, state transitions (`become_master`, `become_replica`)
838 /// will spawn real feeder/receiver I/O threads backed by the live log.
839 ///
840 /// If the RESTORE service was not registered at construction time (because
841 /// `config.env_home` was `None`), it is registered here using the
842 /// environment's actual home path. This mirrors`RepNode.envSetup()`
843 /// which registers the restore handler during environment wiring.
844 ///
845 /// Environment reference wiring.
846 /// `EnvironmentImpl` via `RepImpl.repNode.envImpl` in HA.
847 pub fn with_environment(&self, env: Arc<EnvironmentImpl>) {
848 // Register RESTORE service lazily if not already done.
849 if !self.restore_registered.load(Ordering::SeqCst)
850 && let Some(ref dispatcher) = self.tcp_dispatcher
851 {
852 let env_home = env.get_env_home().to_path_buf();
853 let restore_server = NetworkRestoreServer::new(env_home.clone());
854 dispatcher.register(RESTORE_SERVICE_NAME, Arc::new(restore_server));
855 self.restore_registered.store(true, Ordering::SeqCst);
856 log::debug!(
857 "Node '{}' RESTORE service registered via with_environment \
858 (env_home={})",
859 self.config.node_name,
860 env_home.display(),
861 );
862 }
863
864 // X-14: rebuild the VLSN index from recovery-replayed LN entries.
865 // After a crash the on-disk vlsn.idx may be stale (either ahead of
866 // the recovered B-tree, or behind if vlsn.idx was not flushed
867 // after the last checkpoint). Re-registering all (vlsn, lsn) pairs
868 // from the redo pass gives a consistent in-memory index.
869 if !env.recovery_vlsns.is_empty() {
870 log::info!(
871 "Node '{}': rebuilding VLSN index from {} recovered entries",
872 self.config.node_name,
873 env.recovery_vlsns.len(),
874 );
875 for &(vlsn, lsn_u64) in &env.recovery_vlsns {
876 let lsn = noxu_util::Lsn::from_u64(lsn_u64);
877 self.vlsn_index.register(
878 vlsn,
879 lsn.file_number(),
880 lsn.file_offset(),
881 );
882 }
883 }
884
885 // X-1: truncate the VLSN index to the rollback matchpoint if recovery
886 // detected a completed rollback period. The matchpoint is the highest
887 // LSN that is still valid after the rollback; entries with higher VLSNs
888 // correspond to data that was rolled back and must not appear in the
889 // index.
890 if let Some(matchpoint_lsn_u64) = env.recovery_rollback_matchpoint {
891 // Find the latest VLSN whose LSN is at or before the matchpoint.
892 // Scan the recovered VLSN pairs (sorted ascending) to find the
893 // boundary.
894 let safe_vlsn = env
895 .recovery_vlsns
896 .iter()
897 .rev()
898 .find(|&&(_, lsn_u64)| lsn_u64 <= matchpoint_lsn_u64)
899 .map(|&(vlsn, _)| vlsn)
900 .unwrap_or(0);
901 log::info!(
902 "Node '{}': truncating VLSN index after vlsn={} \
903 (rollback matchpoint lsn={:#x})",
904 self.config.node_name,
905 safe_vlsn,
906 matchpoint_lsn_u64,
907 );
908 self.vlsn_index.truncate_after(safe_vlsn);
909 }
910
911 *self.env_impl.lock().unwrap() = Some(env);
912 }
913
914 /// Get the current node state.
915 ///
916 ///
917 ///
918 /// Returns the current state of the node associated with this replication
919 /// environment. If the caller's intent is to track the state of the node,
920 /// `StateChangeListener` may be a more convenient and efficient approach.
921 pub fn get_state(&self) -> NodeState {
922 self.node_state.get_state()
923 }
924
925 /// Check if this node is the master.
926 ///
927 /// Returns true if the node's current state is Master.
928 pub fn is_master(&self) -> bool {
929 self.node_state.get_state() == NodeState::Master
930 }
931
932 /// Check if this node is a replica.
933 ///
934 /// Returns true if the node's current state is Replica.
935 pub fn is_replica(&self) -> bool {
936 self.node_state.get_state() == NodeState::Replica
937 }
938
939 /// Returns true if the node is currently participating in the group
940 /// as a Replica or a Master.
941 pub fn is_active(&self) -> bool {
942 self.node_state.get_state().is_active()
943 }
944
945 /// Get the node name.
946 ///
947 ///
948 ///
949 /// Returns the unique name used to identify this replicated environment.
950 pub fn get_node_name(&self) -> &str {
951 self.config.node_name.as_str()
952 }
953
954 /// Get the group name.
955 ///
956 /// Returns the name of the replication group this node belongs to.
957 pub fn get_group_name(&self) -> &str {
958 self.config.group_name.as_str()
959 }
960
961 /// Get the current master (if known).
962 ///
963 /// Returns the name of the node that is currently the master, or None
964 /// if the master is not known (e.g. the node is in the Unknown or
965 /// Detached state).
966 pub fn get_master_name(&self) -> Option<String> {
967 self.master_tracker.get_master()
968 }
969
970 /// Get the replication group info.
971 ///
972 ///
973 ///
974 /// Returns a description of the replication group as known by this node.
975 /// The replicated group metadata is stored in a replicated database and
976 /// updates are propagated by the current master node to all replicas. If
977 /// this node is not the master, it is possible for its description of the
978 /// group to be out of date.
979 pub fn get_group(&self) -> &GroupService {
980 &self.group_service
981 }
982
983 /// Add a peer node to the replication group at runtime.
984 ///
985 /// The node is registered in the `GroupService` so elections and quorum
986 /// calculations immediately reflect the new membership.
987 pub fn add_peer(&self, node: crate::rep_node::RepNode) -> Result<()> {
988 use crate::group_service::NodeInfo;
989 use std::time::Instant;
990
991 let info = NodeInfo {
992 name: node.name.clone(),
993 node_type: node.node_type,
994 host: node.host.clone(),
995 port: node.port,
996 node_id: node.node_id,
997 joined_at: Instant::now(),
998 last_seen: Instant::now(),
999 is_active: true,
1000 known_vlsn: 0,
1001 log_range: None,
1002 read_capacity_pct: node.read_capacity_pct,
1003 write_capacity_pct: node.write_capacity_pct,
1004 latency_hint_ms: node.latency_hint_ms,
1005 };
1006 self.group_service.add_node(info)?;
1007 log::info!(
1008 "Node '{}': added peer '{}' ({}:{}) to group '{}'",
1009 self.config.node_name,
1010 node.name,
1011 node.host,
1012 node.port,
1013 self.config.group_name,
1014 );
1015
1016 // F9: if we are the current master, immediately register a
1017 // `Feeder` tracker for the new peer so AckTracker bookkeeping
1018 // and downstream pull-based streaming work without a forced
1019 // re-election.
1020 if self.is_master()
1021 && (node.node_type == crate::node_type::NodeType::Electable
1022 || node.node_type == crate::node_type::NodeType::Secondary)
1023 {
1024 let mut feeders = self.feeders.write();
1025 if !feeders.iter().any(|f| f.get_replica_name() == node.name) {
1026 feeders.push(Feeder::new(node.name.clone()));
1027 log::debug!(
1028 "Node '{}' (master): dispatched Feeder for new peer '{}'",
1029 self.config.node_name,
1030 node.name,
1031 );
1032 }
1033 }
1034 Ok(())
1035 }
1036
1037 /// Remove a peer node from the replication group by name.
1038 ///
1039 /// The node is deregistered from the `GroupService`. Elections initiated
1040 /// after this call will not include the removed node in quorum calculations.
1041 pub fn remove_peer(&self, name: &str) -> Result<()> {
1042 self.group_service.remove_node(name)?;
1043 log::info!(
1044 "Node '{}': removed peer '{}' from group '{}'",
1045 self.config.node_name,
1046 name,
1047 self.config.group_name,
1048 );
1049 Ok(())
1050 }
1051
1052 /// Update the capacity and latency metadata of an existing peer.
1053 ///
1054 /// Only the following fields are updated from `node`:
1055 /// - `read_capacity_pct`
1056 /// - `write_capacity_pct`
1057 /// - `latency_hint_ms`
1058 ///
1059 /// The node's identity (name, address, port, node_type) is preserved.
1060 /// Safe to call while replication is active.
1061 ///
1062 /// If the quorum policy is `Flexible` or `Expression`, the quorum system
1063 /// is rebuilt to reflect the new capacity/latency weights.
1064 ///
1065 /// # Note
1066 ///
1067 /// `update_peer_metadata` does not currently re-run
1068 /// `QuorumPolicy::validate(electable_count)` after the metadata
1069 /// change. An LP-optimal `Expression` quorum that was safe before
1070 /// the update may no longer satisfy the intersection property
1071 /// afterwards. Until automatic revalidation lands, deployments
1072 /// using `QuorumPolicy::Expression` should call
1073 /// `quorum_policy().validate(get_rep_group().electable_count())`
1074 /// on the returned `RepGroup` after every metadata change and
1075 /// fail the operator-facing operation if validation reports
1076 /// unsafety.
1077 pub fn update_peer_metadata(
1078 &self,
1079 name: &str,
1080 node: crate::rep_node::RepNode,
1081 ) -> Result<()> {
1082 self.group_service.update_node_metadata(
1083 name,
1084 node.read_capacity_pct,
1085 node.write_capacity_pct,
1086 node.latency_hint_ms,
1087 )?;
1088 log::info!(
1089 "Node '{}': updated metadata for peer '{}' \
1090 (read_cap={}, write_cap={}, latency={}ms)",
1091 self.config.node_name,
1092 name,
1093 node.read_capacity_pct,
1094 node.write_capacity_pct,
1095 node.latency_hint_ms,
1096 );
1097 Ok(())
1098 }
1099
1100 /// Returns a snapshot of the current replication group as a `RepGroup`.
1101 ///
1102 /// The snapshot reflects the state at the time of the call; subsequent
1103 /// `add_peer` / `remove_peer` calls are not reflected in it.
1104 pub fn get_rep_group(&self) -> crate::rep_group::RepGroup {
1105 use crate::rep_group::RepGroup;
1106
1107 let mut group = RepGroup::new(
1108 self.config.group_name.clone(),
1109 self.group_service.get_group_id(),
1110 );
1111 for info in self.group_service.get_all_nodes() {
1112 let mut node = crate::rep_node::RepNode::new(
1113 info.name.clone(),
1114 info.node_type,
1115 info.host.clone(),
1116 info.port,
1117 info.node_id,
1118 );
1119 node.read_capacity_pct = info.read_capacity_pct;
1120 node.write_capacity_pct = info.write_capacity_pct;
1121 node.latency_hint_ms = info.latency_hint_ms;
1122 group.add_node(node);
1123 }
1124 group
1125 }
1126
1127 /// Get the replication configuration.
1128 ///
1129 ///
1130 ///
1131 /// Returns the replication configuration that has been used to create this
1132 /// environment.
1133 pub fn get_config(&self) -> &RepConfig {
1134 &self.config
1135 }
1136
1137 /// Get the current VLSN range on this node.
1138 ///
1139 /// Returns the range of VLSNs currently available on this node.
1140 pub fn get_vlsn_range(&self) -> VlsnRange {
1141 self.vlsn_index.get_range()
1142 }
1143
1144 /// Get the latest VLSN.
1145 ///
1146 /// Returns the most recent VLSN registered on this node.
1147 pub fn get_current_vlsn(&self) -> u64 {
1148 self.vlsn_index.get_latest_vlsn()
1149 }
1150
1151 /// Return the list of replica names that currently have a `Feeder`
1152 /// tracker on this (master) node.
1153 ///
1154 /// Used by tests and operator tooling. The returned list reflects
1155 /// the master's view at the time of the call; subsequent
1156 /// `add_peer`/`remove_peer` calls may change it.
1157 pub fn feeder_replica_names(&self) -> Vec<String> {
1158 self.feeders.read().iter().map(|f| f.get_replica_name()).collect()
1159 }
1160
1161 /// Bootstrap this node's environment by network-restoring all `.ndb`
1162 /// files from `peer_name` via the dispatcher's RESTORE service.
1163 ///
1164 /// Closes findings F2 / F4 of `docs/src/internal/api-audit-2026-05-rep.md`.
1165 ///
1166 /// The standalone `NetworkRestore::execute()` opens raw TCP and
1167 /// expects to drive the legacy `NetworkRestoreServer::start` listener.
1168 /// Production replicated environments host the RESTORE handler on the
1169 /// dispatcher, so this method routes through `execute_via_dispatcher`.
1170 ///
1171 /// `peer_name` must be a known peer in `GroupService`; on success the
1172 /// peer's `.ndb` files are written into `config.env_home`. Returns
1173 /// `Err` if `env_home` is `None`, the peer is unknown, or the restore
1174 /// fails for any reason.
1175 pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()> {
1176 let env_home = self.config.env_home.clone().ok_or_else(|| {
1177 RepError::ConfigError(
1178 "bootstrap_via_dispatcher requires env_home in RepConfig"
1179 .into(),
1180 )
1181 })?;
1182 let peer_info = self
1183 .group_service
1184 .get_all_nodes()
1185 .into_iter()
1186 .find(|n| n.name == peer_name)
1187 .ok_or_else(|| {
1188 RepError::ConfigError(format!(
1189 "peer '{}' not registered in group '{}'",
1190 peer_name, self.config.group_name,
1191 ))
1192 })?;
1193
1194 let cfg = NetworkRestoreConfig {
1195 source_node: peer_info.name.clone(),
1196 source_host: peer_info.host.clone(),
1197 source_port: peer_info.port,
1198 retain_log_files: true,
1199 };
1200 let restore = NetworkRestore::new(cfg).with_local_dir(env_home);
1201 restore.execute_via_dispatcher()?;
1202 log::info!(
1203 "Node '{}' bootstrapped via dispatcher from '{}' ({}:{})",
1204 self.config.node_name,
1205 peer_info.name,
1206 peer_info.host,
1207 peer_info.port,
1208 );
1209 Ok(())
1210 }
1211
1212 /// Get replication statistics.
1213 ///
1214 ///
1215 ///
1216 /// Returns statistics associated with this environment.
1217 pub fn get_stats(&self) -> &RepStats {
1218 &self.stats
1219 }
1220
1221 /// Get the ack tracker.
1222 pub fn get_ack_tracker(&self) -> &AckTracker {
1223 &self.ack_tracker
1224 }
1225
1226 /// Ensure the node state machine is in Unknown state, transitioning
1227 /// from Detached if necessary. This is needed because the state machine
1228 /// only allows Detached -> Unknown -> Master/Replica.
1229 pub fn ensure_unknown_state(&self) -> Result<()> {
1230 let current = self.node_state.get_state();
1231 match current {
1232 NodeState::Unknown => Ok(()),
1233 NodeState::Detached => {
1234 self.node_state.transition_to(NodeState::Unknown)?;
1235 Ok(())
1236 }
1237 // Master and Replica must transition through Unknown before
1238 // joining a new group or reconnecting.
1239 NodeState::Master | NodeState::Replica => {
1240 self.node_state.transition_to(NodeState::Unknown)?;
1241 Ok(())
1242 }
1243 NodeState::Shutdown => {
1244 Err(RepError::StateError("Node is shut down".to_string()))
1245 }
1246 }
1247 }
1248
1249 /// Transition to master state.
1250 ///
1251 /// Transitions this node to Master state for the given election term.
1252 /// As master, the node can accept write operations and feed log entries
1253 /// to replicas.
1254 ///
1255 /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1256 /// a `FeederRunner` + `EnvironmentLogScanner` background thread is spawned
1257 /// for each currently-registered replica (feeder entries in `feeders`).
1258 ///
1259 /// In HA.
1260 pub fn become_master(&self, term: u64) -> Result<()> {
1261 if self.is_shutdown() {
1262 return Err(RepError::StateError(
1263 "Cannot become master: environment is closed".to_string(),
1264 ));
1265 }
1266
1267 // JE invariant: only `Electable` nodes can become master. `Secondary`,
1268 // `Monitor`, and `Arbiter` are not electable and must be rejected at
1269 // the API layer (mirrors JE `ExceptionTest`). See
1270 // `NodeType::can_be_master`.
1271 if !self.config.node_type.can_be_master() {
1272 return Err(RepError::InvalidStateTransition(format!(
1273 "node '{}' has type {} which is not electable as master",
1274 self.config.node_name.as_str(),
1275 self.config.node_type,
1276 )));
1277 }
1278
1279 // Ensure we can reach Master state (may need Detached -> Unknown first)
1280 self.ensure_unknown_state()?;
1281
1282 let old_state = self.node_state.get_state();
1283 self.node_state.transition_to(NodeState::Master)?;
1284 self.master_tracker.set_master(self.config.node_name.as_str(), term);
1285
1286 // --- F9: spawn Feeder trackers for each known replica -------------
1287 //
1288 // Closes finding F9 of `docs/src/internal/api-audit-2026-05-rep.md`.
1289 // The architecture is pull-based: replicas pull from the master's
1290 // `PEER_FEEDER` service via `catch_up_from_peer`. However, the
1291 // master must:
1292 // 1. Track each replica via a `Feeder` so AckTracker bookkeeping
1293 // can attribute replica acks to the right node.
1294 // 2. Push its own writes into `peer_scanner` so replicas pulling
1295 // from `PEER_FEEDER` actually receive entries (`replicate_entry`).
1296 //
1297 // Here we ensure step 1: every known electable peer in the group
1298 // gets a `Feeder` entry.
1299 {
1300 let mut feeders = self.feeders.write();
1301 // Drop any stale feeders left over from a prior role. A
1302 // `Feeder` is just an in-memory tracker; recreating it is
1303 // cheap and avoids state inversion bugs across role changes.
1304 feeders.clear();
1305 for peer in self.group_service.get_all_nodes() {
1306 if peer.name == self.config.node_name {
1307 continue;
1308 }
1309 if peer.node_type != crate::node_type::NodeType::Electable
1310 && peer.node_type != crate::node_type::NodeType::Secondary
1311 {
1312 // Arbiters do not receive log entries.
1313 continue;
1314 }
1315 feeders.push(Feeder::new(peer.name.clone()));
1316 log::debug!(
1317 "Node '{}' (master, term={}): registered Feeder for \
1318 replica '{}'",
1319 self.config.node_name.as_str(),
1320 term,
1321 peer.name,
1322 );
1323 }
1324 }
1325
1326 // For observability, log the count.
1327 log::info!(
1328 "Node '{}' became master for term {} \
1329 (feeder trackers: {} known replicas)",
1330 self.config.node_name.as_str(),
1331 term,
1332 self.feeders.read().len(),
1333 );
1334
1335 // -------------------------------------------------------------------
1336
1337 // Notify listeners
1338 self.notify_listeners(old_state, NodeState::Master);
1339
1340 Ok(())
1341 }
1342
1343 /// Transition to replica state with the given master.
1344 ///
1345 /// Transitions this node to Replica state. The node will receive log
1346 /// entries from the specified master.
1347 ///
1348 /// If a live `EnvironmentImpl` has been wired in via `with_environment`,
1349 /// the method prepares an `EnvironmentLogWriter` so that replicated
1350 /// entries can be written to the local log. The actual network connection
1351 /// is established by the `TcpServiceDispatcher`; this method logs intent.
1352 ///
1353 /// In HA.
1354 pub fn become_replica(&self, master_name: &str) -> Result<()> {
1355 if self.is_shutdown() {
1356 return Err(RepError::StateError(
1357 "Cannot become replica: environment is closed".to_string(),
1358 ));
1359 }
1360
1361 // Ensure we can reach Replica state (may need Detached -> Unknown first)
1362 self.ensure_unknown_state()?;
1363
1364 let old_state = self.node_state.get_state();
1365 self.node_state.transition_to(NodeState::Replica)?;
1366 self.master_tracker.set_master(master_name, 0);
1367 self.replica_stream.set_master(master_name);
1368 self.replica_stream.set_state(
1369 crate::stream::replica_stream::ReplicaStreamState::Connecting,
1370 );
1371
1372 // --- G19: start replica receive loop --------------------------------
1373 //
1374 // Connects to the master's PEER_FEEDER service and runs a
1375 // ReplicaReceiver loop in a background thread. The receiver writes
1376 // replicated entries via EnvironmentLogWriter.
1377 if let Some(env) = self.env_impl.lock().unwrap().clone() {
1378 if let Some(log_mgr) = env.get_log_manager() {
1379 let vlsn_index =
1380 Arc::new(crate::vlsn::vlsn_index::VlsnIndex::new(10));
1381
1382 // Resolve the master's socket address from the GroupService.
1383 let master_addr_opt: Option<SocketAddr> = self
1384 .group_service
1385 .get_all_nodes()
1386 .iter()
1387 .find(|n| n.name == master_name)
1388 .and_then(|info| {
1389 format!("{}:{}", info.host, info.port)
1390 .parse::<SocketAddr>()
1391 .ok()
1392 });
1393
1394 let node_name = self.config.node_name.clone();
1395 let master = master_name.to_string();
1396 let vlsn_index_clone = Arc::clone(&vlsn_index);
1397 let shutdown_flag = self.io_shutdown.load(Ordering::SeqCst);
1398 // Wave 9-A fix 2: capture a Weak<Self> so the I/O thread
1399 // can call `bootstrap_via_dispatcher` automatically when
1400 // the master signals `NeedsRestore`. When the env was
1401 // never registered with `init_self_weak` (raw
1402 // `Arc::new(Self::new(...))` without going through
1403 // `open()` or the test harness), the weak ref is `None`
1404 // and we fall back to operator-driven bootstrap.
1405 let self_weak: Option<Weak<Self>> =
1406 self.self_weak.get().cloned();
1407
1408 let handle = std::thread::Builder::new()
1409 .name(format!("noxu-replica-{}", node_name))
1410 .spawn(move || {
1411 let mut writer = EnvironmentLogWriter::new(
1412 log_mgr,
1413 vlsn_index_clone,
1414 );
1415
1416 let Some(addr) = master_addr_opt else {
1417 log::warn!(
1418 "noxu-replica-{}: master '{}' address not in RepGroup; \
1419 waiting for TCP dispatcher connection",
1420 node_name, master,
1421 );
1422 return;
1423 };
1424
1425 // Catch-up loop: catch up, observe NeedsRestore,
1426 // optionally auto-bootstrap, retry once. We cap
1427 // the retry count at MAX_AUTO_BOOTSTRAP_ATTEMPTS
1428 // (small) so a misbehaving master does not loop
1429 // forever consuming network bandwidth.
1430 const MAX_AUTO_BOOTSTRAP_ATTEMPTS: u32 = 2;
1431 let mut attempts: u32 = 0;
1432 loop {
1433 log::info!(
1434 "noxu-replica-{}: connecting to master '{}' at {}",
1435 node_name, master, addr,
1436 );
1437 match crate::stream::peer_feeder::catch_up_from_peer(
1438 addr, 0, &mut writer,
1439 ) {
1440 Ok(true) => {
1441 log::info!(
1442 "noxu-replica-{}: catch-up complete from '{}'",
1443 node_name, master,
1444 );
1445 return;
1446 }
1447 Ok(false) => {
1448 // F2/F4: master signals NeedsRestore.
1449 // Wave 9-A fix 2: if a Weak<Self> was
1450 // plumbed in, upgrade it and call
1451 // `bootstrap_via_dispatcher` ourselves
1452 // so the replica auto-bootstraps and
1453 // resumes catch-up without operator
1454 // intervention.
1455 log::warn!(
1456 "noxu-replica-{}: master '{}' requires restore",
1457 node_name, master,
1458 );
1459 attempts += 1;
1460 if attempts > MAX_AUTO_BOOTSTRAP_ATTEMPTS {
1461 log::error!(
1462 "noxu-replica-{}: exceeded \
1463 auto-bootstrap attempts ({}); giving up",
1464 node_name,
1465 MAX_AUTO_BOOTSTRAP_ATTEMPTS,
1466 );
1467 return;
1468 }
1469 let env_arc = match self_weak
1470 .as_ref()
1471 .and_then(Weak::upgrade)
1472 {
1473 Some(e) => e,
1474 None => {
1475 // No back-ref or env dropped:
1476 // fall back to operator-driven
1477 // bootstrap and exit cleanly.
1478 log::warn!(
1479 "noxu-replica-{}: no back-reference \
1480 available; operator must call \
1481 bootstrap_via_dispatcher manually",
1482 node_name,
1483 );
1484 return;
1485 }
1486 };
1487 if env_arc.is_shutdown() {
1488 return;
1489 }
1490 log::info!(
1491 "noxu-replica-{}: auto-bootstrapping via \
1492 dispatcher from '{}' (attempt {})",
1493 node_name, master, attempts,
1494 );
1495 match env_arc
1496 .bootstrap_via_dispatcher(&master)
1497 {
1498 Ok(()) => {
1499 log::info!(
1500 "noxu-replica-{}: auto-bootstrap \
1501 succeeded; resuming catch-up",
1502 node_name,
1503 );
1504 // Drop the strong ref before
1505 // re-entering catch-up so we
1506 // do not keep the env alive
1507 // longer than necessary.
1508 drop(env_arc);
1509 continue;
1510 }
1511 Err(e) => {
1512 log::error!(
1513 "noxu-replica-{}: auto-bootstrap \
1514 failed: {}",
1515 node_name, e,
1516 );
1517 return;
1518 }
1519 }
1520 }
1521 Err(e) => {
1522 if !shutdown_flag {
1523 log::error!(
1524 "noxu-replica-{}: error from master '{}': {e}",
1525 node_name, master,
1526 );
1527 }
1528 return;
1529 }
1530 }
1531 }
1532 })
1533 .expect("failed to spawn noxu-replica thread");
1534
1535 self.io_threads.lock().unwrap().push(handle);
1536
1537 log::debug!(
1538 "Node '{}': replica receive thread started for master '{}'",
1539 self.config.node_name.as_str(),
1540 master_name,
1541 );
1542 } else {
1543 log::warn!(
1544 "Node '{}': no LogManager available (read-only env?); \
1545 replica I/O loop not started",
1546 self.config.node_name.as_str(),
1547 );
1548 }
1549 }
1550 // -------------------------------------------------------------------
1551
1552 // Notify listeners
1553 self.notify_listeners(old_state, NodeState::Replica);
1554
1555 log::info!(
1556 "Node '{}' became replica of master '{}'",
1557 self.config.node_name.as_str(),
1558 master_name
1559 );
1560 Ok(())
1561 }
1562
1563 /// Initiate a master transfer to the target node.
1564 ///
1565 ///
1566 ///
1567 /// Transfers the current master state from this node to one of the
1568 /// electable replicas. The replica that is actually chosen to be the new
1569 /// master is the one with which the Master Transfer can be completed most
1570 /// rapidly. The transfer operation ensures that all changes at this node
1571 /// are available at the new master upon conclusion of the operation.
1572 pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()> {
1573 if self.is_shutdown() {
1574 return Err(RepError::StateError(
1575 "Cannot transfer master: environment is closed".to_string(),
1576 ));
1577 }
1578
1579 if !self.is_master() {
1580 return Err(RepError::InvalidState(
1581 "Master transfer can only be initiated on the master node"
1582 .to_string(),
1583 ));
1584 }
1585
1586 log::info!(
1587 "Node '{}' initiating master transfer to '{}'",
1588 self.config.node_name.as_str(),
1589 config.target_node,
1590 );
1591
1592 // Closes finding F7 of `docs/src/internal/api-audit-2026-05-rep.md`.
1593 //
1594 // Steps:
1595 // 1. Locate the target's address.
1596 // 2. Compute the new term (current observed term + 1).
1597 // 3. Send TRANSFER_MASTER to the target — it will become master.
1598 // 4. Send TRANSFER_MASTER (with the same term + new master name) to
1599 // every other peer so they re-target.
1600 // 5. Demote self to Replica of the target.
1601 //
1602 // The transfer is best-effort: a peer that doesn't ack is logged
1603 // and skipped. The election driver will reconcile any divergence
1604 // on the next election round.
1605
1606 let target_addr = self
1607 .group_service
1608 .get_all_nodes()
1609 .into_iter()
1610 .find(|n| n.name == config.target_node)
1611 .and_then(|n| {
1612 format!("{}:{}", n.host, n.port)
1613 .parse::<std::net::SocketAddr>()
1614 .ok()
1615 })
1616 .ok_or_else(|| {
1617 RepError::ConfigError(format!(
1618 "transfer_master: target '{}' not registered or has bad address",
1619 config.target_node
1620 ))
1621 })?;
1622
1623 let new_term = self.master_tracker.get_term().saturating_add(1);
1624
1625 // 1. Tell the target to become master at the new term.
1626 let target_ack = crate::group_admin::send_transfer_master(
1627 target_addr,
1628 &config.target_node,
1629 new_term,
1630 )
1631 .map_err(|e| {
1632 RepError::NetworkError(format!(
1633 "transfer_master: failed to signal target '{}': {}",
1634 config.target_node, e
1635 ))
1636 })?;
1637 if !target_ack {
1638 return Err(RepError::StateError(format!(
1639 "transfer_master: target '{}' rejected the transfer",
1640 config.target_node
1641 )));
1642 }
1643
1644 // 2. Inform all other peers (best-effort).
1645 for peer in self.group_service.get_all_nodes() {
1646 if peer.name == self.config.node_name
1647 || peer.name == config.target_node
1648 {
1649 continue;
1650 }
1651 if let Ok(addr) = format!("{}:{}", peer.host, peer.port).parse() {
1652 let _ = crate::group_admin::send_transfer_master(
1653 addr,
1654 &config.target_node,
1655 new_term,
1656 );
1657 }
1658 }
1659
1660 // 3. Demote self to Replica of the new master.
1661 self.become_replica(&config.target_node)?;
1662
1663 log::info!(
1664 "Node '{}' transferred master to '{}' at term {}",
1665 self.config.node_name.as_str(),
1666 config.target_node,
1667 new_term,
1668 );
1669 Ok(())
1670 }
1671
1672 /// Register a VLSN (as master, after writing a log entry).
1673 ///
1674 /// Maps the given VLSN to the specified log file position. This is called
1675 /// by the master after it writes a replicated log entry.
1676 pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32) {
1677 self.vlsn_index.register(vlsn, file_number, file_offset);
1678 }
1679
1680 /// Replicate a freshly committed log entry from the master.
1681 ///
1682 /// Closes finding F9 of `docs/src/internal/api-audit-2026-05-rep.md`.
1683 ///
1684 /// Combines `register_vlsn` with a push into the in-memory
1685 /// `peer_scanner` so that downstream replicas pulling from this
1686 /// node's `PEER_FEEDER` service (via `catch_up_from_peer`) can
1687 /// stream the entry without round-tripping through the on-disk
1688 /// log. The local log is still the source of truth; the peer
1689 /// scanner is a fast-path cache that bounds itself via
1690 /// `PeerLogScanner::with_capacity` so old entries are evicted.
1691 ///
1692 /// Should be called by the master after the local commit has
1693 /// fsynced. Calling on a non-master is harmless (the peer
1694 /// scanner cache is also used by replicas) but is logged at trace
1695 /// level for diagnostics.
1696 pub fn replicate_entry(
1697 &self,
1698 vlsn: u64,
1699 file_number: u32,
1700 file_offset: u32,
1701 entry_type: u8,
1702 data: Vec<u8>,
1703 ) {
1704 self.vlsn_index.register(vlsn, file_number, file_offset);
1705 self.peer_scanner.push(vlsn, entry_type, data);
1706 if !self.is_master() {
1707 log::trace!(
1708 "replicate_entry called on non-master node '{}': vlsn={}, type={}",
1709 self.config.node_name,
1710 vlsn,
1711 entry_type,
1712 );
1713 }
1714 }
1715
1716 /// Apply a replicated entry (as replica).
1717 ///
1718 /// Applies a log entry received from the master. This is called by the
1719 /// replica stream handler after receiving an entry from the feeder.
1720 ///
1721 /// `data` is the wire-encoded log-record payload. When the
1722 /// replicated environment has not been wired to a local
1723 /// `noxu_db::Environment` (i.e., before `with_environment` is
1724 /// called) the payload is forwarded into the in-memory peer
1725 /// scanner so that downstream replicas attached to the
1726 /// `PEER_FEEDER` service can re-stream it; the local log is **not**
1727 /// updated. This is documented behaviour rather than a stub — see
1728 /// `api-audit-2026-05-rep.md` finding #26 (medium) for the
1729 /// `with_environment`-required local-apply path.
1730 /// cleanup (rep info F35: `_data` placeholder) renames the leading
1731 /// underscore so reviewers don't read it as a TODO.
1732 pub fn apply_entry(
1733 &self,
1734 vlsn: u64,
1735 entry_type: u8,
1736 data: Vec<u8>,
1737 ) -> Result<()> {
1738 if self.is_shutdown() {
1739 return Err(RepError::StateError(
1740 "Cannot apply entry: environment is closed".to_string(),
1741 ));
1742 }
1743
1744 // Register the VLSN in the index.
1745 self.vlsn_index.register(vlsn, 0, 0);
1746
1747 // Push into the peer log scanner so downstream replicas can
1748 // receive this entry via the PEER_FEEDER service.
1749 self.peer_scanner.push(vlsn, entry_type, data);
1750
1751 log::trace!(
1752 "Applied replicated entry: vlsn={}, type={}",
1753 vlsn,
1754 entry_type
1755 );
1756 Ok(())
1757 }
1758
1759 /// Record an ack from a replica (as master).
1760 ///
1761 /// Records that the specified replica has acknowledged processing up to
1762 /// the given VLSN. This is used by the master to track durability
1763 /// guarantees.
1764 pub fn record_ack(&self, vlsn: u64, replica_name: &str) {
1765 self.ack_tracker.record_ack(vlsn, replica_name);
1766 }
1767
1768 /// Set the state change listener.
1769 ///
1770 ///
1771 ///
1772 /// Sets the listener used to receive asynchronous replication node state
1773 /// change events. Note that there is one listener per replication node,
1774 /// not one per handle. Invoking this method adds to the set of listeners.
1775 ///
1776 /// Invoking this method typically results in an immediate callback to the
1777 /// application via the `on_state_change` method, so that the application
1778 /// is made aware of the existing state of the node at the time the listener
1779 /// is first established.
1780 pub fn set_state_change_listener(
1781 &self,
1782 listener: Arc<dyn StateChangeListener>,
1783 ) {
1784 // Immediately notify the listener of the current state
1785 let current_state = self.node_state.get_state();
1786 let event = StateChangeEvent::new(
1787 current_state,
1788 current_state,
1789 self.get_master_name(),
1790 );
1791 listener.on_state_change(event);
1792
1793 let mut listeners = self.listeners.write();
1794 listeners.push(listener);
1795 }
1796
1797 /// Close the replicated environment.
1798 ///
1799 ///
1800 ///
1801 /// Closes this handle and releases any resources. When closed, daemon
1802 /// threads are stopped, even if they are performing work. The node ceases
1803 /// participation in the replication group. If the node was currently the
1804 /// master, the rest of the group will hold an election.
1805 ///
1806 /// The ReplicatedEnvironment should not be closed while any other type of
1807 /// handle that refers to it is not yet closed.
1808 pub fn close(&self) -> Result<()> {
1809 if self.shutdown.swap(true, Ordering::SeqCst) {
1810 // Already closed
1811 return Ok(());
1812 }
1813
1814 let old_state = self.node_state.get_state();
1815
1816 // Transition to Shutdown state. The state machine allows this from
1817 // any non-Shutdown state.
1818 let _ = self.node_state.transition_to(NodeState::Shutdown);
1819
1820 // Notify listeners of the shutdown
1821 self.notify_listeners(old_state, NodeState::Shutdown);
1822
1823 // Clear feeders
1824 {
1825 let mut feeders = self.feeders.write();
1826 feeders.clear();
1827 }
1828
1829 // Signal and join all I/O threads spawned by become_master /
1830 // become_replica / start_vlsn_persistence_daemon. The vlsn-flush
1831 // thread does a final flush on its way out so a clean close is
1832 // recoverable. Closes finding F11.
1833 self.io_shutdown.store(true, Ordering::SeqCst);
1834 {
1835 let mut threads = self.io_threads.lock().unwrap();
1836 for handle in threads.drain(..) {
1837 let _ = handle.join();
1838 }
1839 }
1840
1841 // Belt-and-braces: even when no daemon is running (e.g.
1842 // `ReplicatedEnvironment::new` without `open`), persist a final
1843 // snapshot if env_home is configured.
1844 if let Some(ref home) = self.config.env_home
1845 && let Err(e) =
1846 crate::vlsn::persist::flush_to_disk(&self.vlsn_index, home)
1847 {
1848 log::warn!(
1849 "close: failed to persist VLSN index to {}: {}",
1850 home.display(),
1851 e
1852 );
1853 }
1854
1855 // Stop the TCP service dispatcher (the: serviceDispatcher.shutdown()).
1856 if let Some(ref dispatcher) = self.tcp_dispatcher {
1857 dispatcher.stop();
1858 log::debug!(
1859 "Node '{}' TCP service dispatcher stopped",
1860 self.config.node_name.as_str()
1861 );
1862 }
1863
1864 log::info!(
1865 "Replicated environment '{}' in group '{}' closed",
1866 self.config.node_name.as_str(),
1867 self.config.group_name.as_str()
1868 );
1869
1870 Ok(())
1871 }
1872
1873 /// Close this handle and shut down the Replication Group by forcing all
1874 /// active Replicas to exit.
1875 ///
1876 ///
1877 ///
1878 /// This method must be invoked on the node that's currently the Master
1879 /// after all other outstanding handles have been closed. The Master waits
1880 /// for all active Replicas to catch up so that they have a current set of
1881 /// logs, and then shuts them down.
1882 pub fn shutdown_group(
1883 &self,
1884 replica_shutdown_timeout_ms: u64,
1885 ) -> Result<()> {
1886 if !self.is_master() {
1887 return Err(RepError::InvalidState(
1888 "shutdownGroup must be invoked on the master".to_string(),
1889 ));
1890 }
1891
1892 log::info!(
1893 "Node '{}' shutting down replication group '{}' (replica_timeout={}ms)",
1894 self.config.node_name.as_str(),
1895 self.config.group_name.as_str(),
1896 replica_shutdown_timeout_ms,
1897 );
1898
1899 // Closes finding F8 of `docs/src/internal/api-audit-2026-05-rep.md`.
1900 //
1901 // Send SHUTDOWN_GROUP to every known peer. The recipient calls
1902 // its own `close()` and the per-connection ADMIN handler
1903 // returns ACK_OK. Any peer that doesn't ack within the
1904 // timeout is logged and the master proceeds. After signalling
1905 // every peer, the master closes its own env.
1906 let deadline = std::time::Instant::now()
1907 + Duration::from_millis(replica_shutdown_timeout_ms);
1908
1909 for peer in self.group_service.get_all_nodes() {
1910 if peer.name == self.config.node_name {
1911 continue;
1912 }
1913 // Don't exceed the deadline waiting for any single peer.
1914 let now = std::time::Instant::now();
1915 if now >= deadline {
1916 log::warn!(
1917 "shutdown_group: deadline reached; skipping remaining peers"
1918 );
1919 break;
1920 }
1921 let addr_str = format!("{}:{}", peer.host, peer.port);
1922 let addr = match addr_str.parse::<SocketAddr>() {
1923 Ok(a) => a,
1924 Err(e) => {
1925 log::warn!(
1926 "shutdown_group: peer '{}' has bad address {}: {}",
1927 peer.name,
1928 addr_str,
1929 e
1930 );
1931 continue;
1932 }
1933 };
1934 match crate::group_admin::send_shutdown_group(addr) {
1935 Ok(true) => log::info!(
1936 "shutdown_group: peer '{}' acknowledged",
1937 peer.name
1938 ),
1939 Ok(false) => log::warn!(
1940 "shutdown_group: peer '{}' rejected the request",
1941 peer.name
1942 ),
1943 Err(e) => log::warn!(
1944 "shutdown_group: peer '{}' unreachable: {}",
1945 peer.name,
1946 e
1947 ),
1948 }
1949 }
1950
1951 // Master closes itself last.
1952 self.close()
1953 }
1954
1955 /// Check if shutdown is in progress.
1956 pub fn is_shutdown(&self) -> bool {
1957 self.shutdown.load(Ordering::SeqCst)
1958 }
1959
1960 /// Notify all registered listeners of a state change.
1961 fn notify_listeners(&self, old_state: NodeState, new_state: NodeState) {
1962 let listeners = self.listeners.read();
1963 if !listeners.is_empty() {
1964 let event = StateChangeEvent::new(
1965 old_state,
1966 new_state,
1967 self.get_master_name(),
1968 );
1969 for listener in listeners.iter() {
1970 listener.on_state_change(event.clone());
1971 }
1972 }
1973 }
1974}
1975
1976// ---------------------------------------------------------------------------
1977// F1: ReplicaAckCoordinator impl wires master commits into the AckTracker.
1978// ---------------------------------------------------------------------------
1979//
1980// `noxu_db::Transaction::commit_with_durability` calls
1981// `await_replica_acks` after the local WAL fsync. This impl:
1982//
1983// 1. Rejects calls on a non-master node with `NotMaster`.
1984// 2. Rejects calls during shutdown with `Shutdown`.
1985// 3. Computes the required ack count from `electable_count` and the
1986// requested policy.
1987// 4. Allocates a unique commit sequence number, registers the ack
1988// requirement on the `AckTracker`, and polls `is_satisfied` with
1989// a small sleep until either the timeout elapses or the policy
1990// is satisfied.
1991// 5. Cleans up the tracker entry on every exit path.
1992//
1993// Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
1994impl ReplicaAckCoordinator for ReplicatedEnvironment {
1995 fn await_replica_acks(
1996 &self,
1997 policy: ReplicaAckPolicyKind,
1998 timeout: Duration,
1999 ) -> std::result::Result<u32, AckWaitError> {
2000 // Fast-path: ReplicaAckPolicy::None never blocks. The trait spec
2001 // says callers may already short-circuit, but be defensive.
2002 if matches!(policy, ReplicaAckPolicyKind::None) {
2003 return Ok(0);
2004 }
2005
2006 if self.is_shutdown() {
2007 return Err(AckWaitError {
2008 kind: AckWaitErrorKind::Shutdown,
2009 needed: 0,
2010 received: 0,
2011 });
2012 }
2013
2014 if !self.is_master() {
2015 return Err(AckWaitError {
2016 kind: AckWaitErrorKind::NotMaster,
2017 needed: 0,
2018 received: 0,
2019 });
2020 }
2021
2022 // Count electable peers (excluding the master) using the
2023 // RepGroup view, which counts Arbiters and Electables
2024 // identically. Only Electable nodes are counted as data
2025 // replicas able to ack a commit. The master itself is
2026 // *implicit*: it is not registered in `group_service` (only
2027 // peers are), so we add 1 to obtain the total electable
2028 // count expected by `ReplicaAckPolicyKind::required_acks`.
2029 let group = self.get_rep_group();
2030 let electable_peers: u32 = group
2031 .get_nodes()
2032 .iter()
2033 .filter(|n| n.node_type == crate::node_type::NodeType::Electable)
2034 .count() as u32;
2035 let electable_count: u32 = electable_peers + 1; // +1 for self/master
2036
2037 let needed = policy.required_acks(electable_count);
2038 if needed == 0 {
2039 // Single-node group, or All with only the master itself.
2040 return Ok(0);
2041 }
2042
2043 let commit_seq = self
2044 .commit_ack_seq
2045 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2046 self.ack_tracker.register(commit_seq, needed);
2047
2048 // Poll-with-sleep loop. The poll interval is small enough that
2049 // late acks satisfy the policy promptly, and large enough that
2050 // a single commit waiting on a slow replica does not spin a
2051 // CPU.
2052 let poll_interval =
2053 std::cmp::min(timeout / 50, Duration::from_millis(20));
2054 let poll_interval = if poll_interval.is_zero() {
2055 Duration::from_millis(1)
2056 } else {
2057 poll_interval
2058 };
2059 let deadline = std::time::Instant::now() + timeout;
2060
2061 loop {
2062 if self.ack_tracker.is_satisfied(commit_seq) {
2063 self.ack_tracker.cleanup_through(commit_seq);
2064 return Ok(needed);
2065 }
2066 if self.is_shutdown() {
2067 self.ack_tracker.cleanup_through(commit_seq);
2068 return Err(AckWaitError {
2069 kind: AckWaitErrorKind::Shutdown,
2070 needed,
2071 received: 0,
2072 });
2073 }
2074 let now = std::time::Instant::now();
2075 if now >= deadline {
2076 // Tear down the registration so it doesn't accumulate;
2077 // record the partial ack count so the caller can report
2078 // a useful `InsufficientReplicas { required, available }`.
2079 let received =
2080 self.ack_tracker.received_count(commit_seq).unwrap_or(0);
2081 self.ack_tracker.cleanup_through(commit_seq);
2082 return Err(AckWaitError {
2083 kind: AckWaitErrorKind::Timeout,
2084 needed,
2085 received,
2086 });
2087 }
2088 let sleep_for = std::cmp::min(
2089 poll_interval,
2090 deadline.saturating_duration_since(now),
2091 );
2092 std::thread::sleep(sleep_for);
2093 }
2094 }
2095
2096 /// X-3: allocate the next VLSN for a recovered XA commit and register
2097 /// `lsn` in the VLSN index so feeders can stream the commit.
2098 ///
2099 /// Increments off the current latest VLSN so the new VLSN is strictly
2100 /// monotonically increasing. In a single-node or master-less environment
2101 /// (not master) returns 0 (NULL_VLSN — harmless, the default).
2102 fn alloc_vlsn_for_recovered_commit(&self, lsn: noxu_util::Lsn) -> u64 {
2103 // Only allocate a VLSN when we are the master; on a replica the
2104 // recovered XA should have been replicated by the original master.
2105 if !self.is_master() {
2106 return 0;
2107 }
2108 let next_vlsn = self.vlsn_index.get_latest_vlsn() + 1;
2109 self.vlsn_index.register(
2110 next_vlsn,
2111 lsn.file_number(),
2112 lsn.file_offset(),
2113 );
2114 log::debug!(
2115 "alloc_vlsn_for_recovered_commit: allocated vlsn={} for lsn={:?}",
2116 next_vlsn,
2117 lsn
2118 );
2119 next_vlsn
2120 }
2121}
2122
2123#[cfg(test)]
2124mod tests {
2125 use super::*;
2126 use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
2127
2128 /// Helper to create a test config with a fixed port (unit-test style,
2129 /// no real TCP bind needed — hostname "localhost" resolves but the port
2130 /// might be in use; use `test_config_port0` for real TCP tests).
2131 fn test_config(node_name: &str) -> RepConfig {
2132 RepConfig::builder("test_group", node_name, "localhost")
2133 .node_port(5001)
2134 .build()
2135 }
2136
2137 /// Helper to create a test config that binds to an OS-assigned port.
2138 fn test_config_port0(node_name: &str) -> RepConfig {
2139 RepConfig::builder("test_group", node_name, "127.0.0.1")
2140 .node_port(0)
2141 .build()
2142 }
2143
2144 #[test]
2145 fn test_initial_state_is_detached() {
2146 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2147 // NodeStateMachine starts in Detached state
2148 assert_eq!(env.get_state(), NodeState::Detached);
2149 assert!(!env.is_master());
2150 assert!(!env.is_replica());
2151 assert!(!env.is_active());
2152 }
2153
2154 #[test]
2155 fn test_become_master() {
2156 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2157 env.become_master(1).unwrap();
2158 assert_eq!(env.get_state(), NodeState::Master);
2159 assert!(env.is_master());
2160 assert!(!env.is_replica());
2161 assert!(env.is_active());
2162 }
2163
2164 #[test]
2165 fn test_become_replica() {
2166 let env = ReplicatedEnvironment::new(test_config("node2")).unwrap();
2167 env.become_replica("node1").unwrap();
2168 assert_eq!(env.get_state(), NodeState::Replica);
2169 assert!(!env.is_master());
2170 assert!(env.is_replica());
2171 assert!(env.is_active());
2172 }
2173
2174 #[test]
2175 fn test_get_node_name() {
2176 let env = ReplicatedEnvironment::new(test_config("my_node")).unwrap();
2177 assert_eq!(env.get_node_name(), "my_node");
2178 }
2179
2180 #[test]
2181 fn test_get_group_name() {
2182 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2183 assert_eq!(env.get_group_name(), "test_group");
2184 }
2185
2186 #[test]
2187 fn test_register_vlsn_updates_index() {
2188 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2189 env.register_vlsn(1, 0, 100);
2190 env.register_vlsn(2, 0, 200);
2191 env.register_vlsn(3, 0, 300);
2192
2193 assert_eq!(env.get_current_vlsn(), 3);
2194 let range = env.get_vlsn_range();
2195 assert_eq!(range.first(), 1);
2196 assert_eq!(range.last(), 3);
2197 }
2198
2199 #[test]
2200 fn test_record_ack() {
2201 let env = ReplicatedEnvironment::new(test_config("master")).unwrap();
2202 env.become_master(1).unwrap();
2203
2204 env.register_vlsn(1, 0, 100);
2205 // Register a pending ack requirement, then record ack
2206 env.get_ack_tracker().register(1, 1);
2207 env.record_ack(1, "replica1");
2208 // Ack should be satisfied
2209 assert!(env.get_ack_tracker().is_satisfied(1));
2210 }
2211
2212 #[test]
2213 fn test_close_sets_shutdown() {
2214 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2215 assert!(!env.is_shutdown());
2216
2217 env.close().unwrap();
2218 assert!(env.is_shutdown());
2219 // After close, state should be Shutdown
2220 assert_eq!(env.get_state(), NodeState::Shutdown);
2221 }
2222
2223 #[test]
2224 fn test_close_is_idempotent() {
2225 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2226 env.close().unwrap();
2227 env.close().unwrap(); // Should not error
2228 assert!(env.is_shutdown());
2229 }
2230
2231 #[test]
2232 fn test_cannot_become_master_when_shutdown() {
2233 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2234 env.close().unwrap();
2235
2236 let result = env.become_master(1);
2237 assert!(result.is_err());
2238 }
2239
2240 #[test]
2241 fn test_cannot_become_replica_when_shutdown() {
2242 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2243 env.close().unwrap();
2244
2245 let result = env.become_replica("master");
2246 assert!(result.is_err());
2247 }
2248
2249 #[test]
2250 fn test_cannot_apply_entry_when_shutdown() {
2251 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2252 env.close().unwrap();
2253
2254 let result = env.apply_entry(1, 0, vec![1, 2, 3]);
2255 assert!(result.is_err());
2256 }
2257
2258 #[test]
2259 fn test_cannot_transfer_master_when_not_master() {
2260 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2261 env.become_replica("other").unwrap();
2262
2263 let config = MasterTransferConfig::new(
2264 "target_node".to_string(),
2265 Duration::from_secs(30),
2266 );
2267 let result = env.transfer_master(config);
2268 assert!(result.is_err());
2269 }
2270
2271 #[test]
2272 fn test_transfer_master_requires_registered_target() {
2273 // F7: transfer_master is no longer a no-op; it sends an ADMIN
2274 // TRANSFER_MASTER signal to the target via TCP. An unregistered
2275 // target is rejected at the address-resolution step.
2276 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2277 env.become_master(1).unwrap();
2278
2279 let config = MasterTransferConfig::new(
2280 "unknown_target".to_string(),
2281 Duration::from_secs(30),
2282 );
2283 let result = env.transfer_master(config);
2284 assert!(
2285 result.is_err(),
2286 "transfer_master to unregistered target must error"
2287 );
2288 }
2289
2290 #[test]
2291 fn test_apply_entry_registers_vlsn() {
2292 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2293 env.become_replica("master").unwrap();
2294
2295 env.apply_entry(1, 0, vec![1, 2, 3]).unwrap();
2296 env.apply_entry(2, 0, vec![4, 5, 6]).unwrap();
2297
2298 assert_eq!(env.get_current_vlsn(), 2);
2299 }
2300
2301 #[test]
2302 fn test_master_name_tracking() {
2303 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2304
2305 // Initially no master known
2306 assert!(env.get_master_name().is_none());
2307
2308 // After becoming master, this node is the master
2309 env.become_master(1).unwrap();
2310 assert_eq!(env.get_master_name(), Some("node1".to_string()));
2311 }
2312
2313 #[test]
2314 fn test_master_to_replica_transition() {
2315 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2316
2317 // Become master first
2318 env.become_master(1).unwrap();
2319 assert_eq!(env.get_master_name(), Some("node1".to_string()));
2320
2321 // Transition to replica (Master -> Replica is valid)
2322 env.become_replica("other_master").unwrap();
2323 assert_eq!(env.get_master_name(), Some("other_master".to_string()));
2324 assert!(env.is_replica());
2325 }
2326
2327 #[test]
2328 fn test_state_change_listener_notification() {
2329 struct TestListener {
2330 call_count: AtomicU32,
2331 last_new_state: noxu_sync::Mutex<Option<NodeState>>,
2332 }
2333
2334 impl StateChangeListener for TestListener {
2335 fn on_state_change(&self, event: StateChangeEvent) {
2336 self.call_count.fetch_add(1, AtomicOrdering::SeqCst);
2337 *self.last_new_state.lock() = Some(event.new_state);
2338 }
2339 }
2340
2341 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2342 let listener = Arc::new(TestListener {
2343 call_count: AtomicU32::new(0),
2344 last_new_state: noxu_sync::Mutex::new(None),
2345 });
2346
2347 // Setting the listener should trigger an immediate notification
2348 env.set_state_change_listener(listener.clone());
2349 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 1);
2350
2351 // State change should trigger another notification
2352 env.become_master(1).unwrap();
2353 assert_eq!(listener.call_count.load(AtomicOrdering::SeqCst), 2);
2354 assert_eq!(*listener.last_new_state.lock(), Some(NodeState::Master));
2355 }
2356
2357 #[test]
2358 fn test_close_notifies_listeners() {
2359 struct ShutdownListener {
2360 shutdown_seen: AtomicBool,
2361 }
2362
2363 impl StateChangeListener for ShutdownListener {
2364 fn on_state_change(&self, event: StateChangeEvent) {
2365 if event.new_state == NodeState::Shutdown {
2366 self.shutdown_seen.store(true, AtomicOrdering::SeqCst);
2367 }
2368 }
2369 }
2370
2371 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2372 let listener = Arc::new(ShutdownListener {
2373 shutdown_seen: AtomicBool::new(false),
2374 });
2375
2376 // The initial notification is for the current (Detached) state
2377 env.set_state_change_listener(listener.clone());
2378
2379 // Become master first so the close transition is meaningful
2380 env.become_master(1).unwrap();
2381 assert!(!listener.shutdown_seen.load(AtomicOrdering::SeqCst));
2382
2383 env.close().unwrap();
2384 assert!(listener.shutdown_seen.load(AtomicOrdering::SeqCst));
2385 }
2386
2387 #[test]
2388 fn test_shutdown_group_requires_master() {
2389 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2390 env.become_replica("other").unwrap();
2391
2392 let result = env.shutdown_group(5000);
2393 assert!(result.is_err());
2394 }
2395
2396 #[test]
2397 fn test_shutdown_group_as_master() {
2398 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2399 env.become_master(1).unwrap();
2400
2401 let result = env.shutdown_group(5000);
2402 assert!(result.is_ok());
2403 assert!(env.is_shutdown());
2404 }
2405
2406 #[test]
2407 fn test_get_config() {
2408 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2409 assert_eq!(env.get_config().node_name, "node1");
2410 assert_eq!(env.get_config().group_name, "test_group");
2411 }
2412
2413 #[test]
2414 fn test_get_stats() {
2415 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2416 let _stats = env.get_stats();
2417 // Just verify we can access stats without panicking
2418 }
2419
2420 // -----------------------------------------------------------------------
2421 // TCP dispatcher tests (H-5 / H-7)
2422 // -----------------------------------------------------------------------
2423
2424 #[test]
2425 fn test_tcp_dispatcher_starts_on_new() {
2426 // Use port 0 so the OS assigns an ephemeral port.
2427 let env =
2428 ReplicatedEnvironment::new(test_config_port0("tcp_node")).unwrap();
2429 // The dispatcher must have started and bound a real port.
2430 let addr = env.bound_addr();
2431 assert!(addr.is_some(), "expected a bound address");
2432 let addr = addr.unwrap();
2433 assert_ne!(addr.port(), 0, "OS should assign a non-zero port");
2434 }
2435
2436 #[test]
2437 fn test_tcp_dispatcher_stops_on_close() {
2438 let env =
2439 ReplicatedEnvironment::new(test_config_port0("tcp_node2")).unwrap();
2440 // Dispatcher is running.
2441 assert!(
2442 env.tcp_dispatcher
2443 .as_ref()
2444 .map(|d| d.is_running())
2445 .unwrap_or(false)
2446 );
2447
2448 env.close().unwrap();
2449
2450 // After close, dispatcher must be stopped.
2451 assert!(
2452 !env.tcp_dispatcher
2453 .as_ref()
2454 .map(|d| d.is_running())
2455 .unwrap_or(false),
2456 "dispatcher should be stopped after close"
2457 );
2458 }
2459
2460 #[test]
2461 fn test_tcp_dispatcher_accepts_connection() {
2462 use crate::net::Channel;
2463 use crate::net::ServiceHandler;
2464 use crate::net::service_dispatcher::connect_to_service;
2465 use std::sync::atomic::{AtomicU32, Ordering as AO};
2466 use std::time::Duration;
2467
2468 struct PingHandler {
2469 count: AtomicU32,
2470 }
2471 impl ServiceHandler for PingHandler {
2472 fn service_name(&self) -> &str {
2473 "ping"
2474 }
2475 fn handle(&self, ch: Box<dyn Channel>) -> crate::error::Result<()> {
2476 self.count.fetch_add(1, AO::SeqCst);
2477 // Echo the first message back.
2478 if let Ok(Some(msg)) = ch.receive(Duration::from_secs(2)) {
2479 let _ = ch.send(&msg);
2480 }
2481 Ok(())
2482 }
2483 }
2484
2485 let env =
2486 ReplicatedEnvironment::new(test_config_port0("tcp_node3")).unwrap();
2487 let addr = env.bound_addr().expect("dispatcher must be bound");
2488
2489 // Register a ping handler on the running dispatcher.
2490 if let Some(ref disp) = env.tcp_dispatcher {
2491 let handler = Arc::new(PingHandler { count: AtomicU32::new(0) });
2492 disp.register("ping", handler.clone());
2493
2494 // Give the accept thread a moment.
2495 std::thread::sleep(Duration::from_millis(20));
2496
2497 let client = connect_to_service(addr, "ping").unwrap();
2498 client.send(b"hello").unwrap();
2499 let reply = client.receive(Duration::from_secs(2)).unwrap();
2500 assert_eq!(reply, Some(b"hello".to_vec()));
2501
2502 assert_eq!(handler.count.load(AO::SeqCst), 1);
2503 }
2504
2505 env.close().unwrap();
2506 }
2507
2508 #[test]
2509 fn test_become_master_auto_transitions_from_detached() {
2510 // The state machine requires Detached -> Unknown -> Master.
2511 // become_master() should handle this automatically.
2512 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2513 assert_eq!(env.get_state(), NodeState::Detached);
2514 env.become_master(1).unwrap();
2515 assert_eq!(env.get_state(), NodeState::Master);
2516 }
2517
2518 #[test]
2519 fn test_become_replica_auto_transitions_from_detached() {
2520 // The state machine requires Detached -> Unknown -> Replica.
2521 // become_replica() should handle this automatically.
2522 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2523 assert_eq!(env.get_state(), NodeState::Detached);
2524 env.become_replica("master_node").unwrap();
2525 assert_eq!(env.get_state(), NodeState::Replica);
2526 }
2527
2528 #[test]
2529 fn test_cannot_transfer_master_when_shutdown() {
2530 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2531 env.become_master(1).unwrap();
2532 env.close().unwrap();
2533
2534 let config = MasterTransferConfig::new(
2535 "target".to_string(),
2536 Duration::from_secs(30),
2537 );
2538 let result = env.transfer_master(config);
2539 assert!(result.is_err());
2540 }
2541
2542 #[test]
2543 fn test_full_lifecycle() {
2544 let env = ReplicatedEnvironment::new(test_config("node1")).unwrap();
2545
2546 // Start as detached
2547 assert_eq!(env.get_state(), NodeState::Detached);
2548
2549 // Become master
2550 env.become_master(1).unwrap();
2551 assert!(env.is_master());
2552
2553 // Register some VLSNs
2554 env.register_vlsn(1, 0, 100);
2555 env.register_vlsn(2, 0, 200);
2556
2557 // Record ack from replica
2558 env.record_ack(1, "replica1");
2559 env.record_ack(2, "replica1");
2560
2561 // Transition to replica (simulating failover)
2562 env.become_replica("node2").unwrap();
2563 assert!(env.is_replica());
2564
2565 // Apply entries from new master
2566 env.apply_entry(3, 0, vec![7, 8, 9]).unwrap();
2567
2568 // Close
2569 env.close().unwrap();
2570 assert!(env.is_shutdown());
2571 }
2572
2573 /// Verify that `with_environment` lazily registers the RESTORE service on
2574 /// the TCP dispatcher when `config.env_home` was not set at construction.
2575 ///
2576 /// This mirrors`RepNode.envSetup()` which registers the restore handler
2577 /// when the environment is wired into the replicated node.
2578 #[test]
2579 fn test_restore_registered_lazily_via_with_environment() {
2580 use noxu_dbi::EnvironmentImpl;
2581 use tempfile::TempDir;
2582
2583 let dir = TempDir::new().expect("temp dir");
2584
2585 // Build config WITHOUT env_home — dispatcher starts, but no RESTORE handler yet.
2586 let config = RepConfig::builder("test_group", "node1", "127.0.0.1")
2587 .node_port(0)
2588 .build();
2589
2590 let rep_env = ReplicatedEnvironment::new(config).unwrap();
2591
2592 // Not yet registered.
2593 assert!(
2594 !rep_env
2595 .restore_registered
2596 .load(std::sync::atomic::Ordering::SeqCst)
2597 );
2598
2599 // Wire in a real EnvironmentImpl so get_env_home() returns the temp dir.
2600 let env_impl = Arc::new(
2601 EnvironmentImpl::new(dir.path(), false, false).expect("open env"),
2602 );
2603 rep_env.with_environment(env_impl);
2604
2605 // Now the RESTORE service must be registered.
2606 assert!(
2607 rep_env
2608 .restore_registered
2609 .load(std::sync::atomic::Ordering::SeqCst)
2610 );
2611 }
2612
2613 /// Verify that when `config.env_home` IS set at construction, the RESTORE
2614 /// service is registered immediately (not deferred).
2615 #[test]
2616 fn test_restore_registered_eagerly_when_env_home_in_config() {
2617 use tempfile::TempDir;
2618
2619 let dir = TempDir::new().expect("temp dir");
2620
2621 let config = RepConfig::builder("test_group", "node2", "127.0.0.1")
2622 .node_port(0)
2623 .env_home(dir.path())
2624 .build();
2625
2626 let rep_env = ReplicatedEnvironment::new(config).unwrap();
2627
2628 // Should be registered immediately (env_home was in config).
2629 assert!(
2630 rep_env
2631 .restore_registered
2632 .load(std::sync::atomic::Ordering::SeqCst)
2633 );
2634 }
2635}