Skip to main content

ReplicatedEnvironment

Struct ReplicatedEnvironment 

Source
pub struct ReplicatedEnvironment { /* private fields */ }

Implementations§

Source§

impl ReplicatedEnvironment

Source

pub fn new(config: RepConfig) -> Result<Self>

Create a new replicated environment.

Creates a replicated environment handle and starts participating in the replication group. The node’s state is determined when it joins the group, and mastership is not preconfigured. If the group has no current master, creation will trigger an election to determine whether this node will participate as a Master or a Replica.

A brand new node will always join an existing group as a Replica, unless it is the very first electable node that is creating the group. In that case it joins as the Master of the newly formed singleton group.

Source

pub fn open(config: RepConfig) -> Result<Arc<Self>>

Open a replicated environment with the standard production lifecycle.

This is the entry point recommended by the mdBook chapters: it allocates the ReplicatedEnvironment, registers all services on the TCP dispatcher, and spawns the election driver background thread that runs Paxos rounds against known peers until the node has resolved into either Master or Replica state.

Closes finding F6 of the 2026 review.

Use ReplicatedEnvironment::new directly only when the caller plans to drive state transitions explicitly (test harnesses, scripted bootstrap, recovery tooling).

Source

pub fn init_self_weak(self: &Arc<Self>)

Populate the env’s self-referential Weak so background threads can obtain a back-reference for auto-orchestrated follow-up actions (e.g. replica auto-bootstrap on NeedsRestore). Idempotent: subsequent calls are silent no-ops because the inner OnceLock only accepts one set.

Callers that wrap the env in Arc and want auto-bootstrap behaviour should call this immediately after construction. Self::open already does so. Test harnesses that drive transitions manually (RepTestBase) also call this so the auto-bootstrap path is exercised in tests.

Source

pub fn register_admin_service(self: &Arc<Self>)

Register the ADMIN service handler on the TCP dispatcher.

Closes findings F7 / F8. Holds a Weak<Self> so the handler does not extend the env’s lifetime. Idempotent: re-registering is harmless because TcpServiceDispatcher::register overwrites the existing handler.

Source

pub fn start_vlsn_persistence_daemon(self: &Arc<Self>)

Spawn the VLSN-index persistence daemon (F11).

Periodically (every 2 seconds) snapshots the in-memory VlsnIndex to <env_home>/vlsn.idx so a clean restart can resume from where the replica left off without a full network restore. No-op when config.env_home is None.

Idempotent: only one daemon is ever spawned per env.

Source

pub fn start_election_driver(self: &Arc<Self>)

Spawn the election driver background thread.

While the env is in Detached or Unknown state and no master is known, the driver periodically attempts a Paxos election against peers in GroupService (whose ELECTION services were registered at open() time). On success the driver calls become_master (if this node is the winner) or become_replica (otherwise). On failure (no quorum), the driver waits config.election_timeout and tries again.

The driver respects io_shutdown; on env close the loop exits promptly.

Idempotent: a second call is a no-op (only one driver thread is ever spawned per env).

Source

pub fn bound_addr(&self) -> Option<SocketAddr>

Return the socket address the TCP service dispatcher is bound to.

This may differ from the configured node_port when port 0 is used (the OS assigns an ephemeral port). Returns None if the dispatcher could not be started (e.g. the address is not resolvable).

Source

pub fn with_environment(&self, env: Arc<EnvironmentImpl>)

Wire a live EnvironmentImpl into this replicated environment.

After this call, state transitions (become_master, become_replica) will spawn real feeder/receiver I/O threads backed by the live log.

If the RESTORE service was not registered at construction time (because config.env_home was None), it is registered here using the environment’s actual home path. This mirrorsRepNode.envSetup() which registers the restore handler during environment wiring.

Environment reference wiring. EnvironmentImpl via RepImpl.repNode.envImpl in HA.

Source

pub fn get_state(&self) -> NodeState

Get the current node state.

Returns the current state of the node associated with this replication environment. If the caller’s intent is to track the state of the node, StateChangeListener may be a more convenient and efficient approach.

Source

pub fn is_master(&self) -> bool

Check if this node is the master.

Returns true if the node’s current state is Master.

Source

pub fn is_authoritative_master(&self) -> bool

Returns true if this node is an authoritative master (D4, JE ElectionQuorum.isAuthoritativeMaster): it is the group master AND it is still connected to enough replicas that, including itself, a SIMPLE_MAJORITY quorum is present.

A master on the minority side of a network partition is NOT authoritative — it must not claim the special election ranking (MASTER_RANKING) nor (eventually) continue accepting writes, so the majority side can elect a fresh master without it competing (split-brain prevention).

“Active replica count” = the number of currently-connected push-feeder runners serving electable peers (Monitors/Secondaries do not count toward the election quorum). + 1 for this master itself.

Source

pub fn is_replica(&self) -> bool

Check if this node is a replica.

Returns true if the node’s current state is Replica.

Source

pub fn is_active(&self) -> bool

Returns true if the node is currently participating in the group as a Replica or a Master.

Source

pub fn get_node_name(&self) -> &str

Get the node name.

Returns the unique name used to identify this replicated environment.

Source

pub fn get_group_name(&self) -> &str

Get the group name.

Returns the name of the replication group this node belongs to.

Source

pub fn get_master_name(&self) -> Option<String>

Get the current master (if known).

Returns the name of the node that is currently the master, or None if the master is not known (e.g. the node is in the Unknown or Detached state).

Source

pub fn get_group(&self) -> &GroupService

Get the replication group info.

Returns a description of the replication group as known by this node. The replicated group metadata is stored in a replicated database and updates are propagated by the current master node to all replicas. If this node is not the master, it is possible for its description of the group to be out of date.

Source

pub fn add_peer(&self, node: RepNode) -> Result<()>

Add a peer node to the replication group at runtime.

The node is registered in the GroupService so elections and quorum calculations immediately reflect the new membership.

Source

pub fn remove_peer(&self, name: &str) -> Result<()>

Remove a peer node from the replication group by name.

The node is deregistered from the GroupService. Elections initiated after this call will not include the removed node in quorum calculations.

Source

pub fn update_peer_metadata(&self, name: &str, node: RepNode) -> Result<()>

Update the capacity and latency metadata of an existing peer.

Only the following fields are updated from node:

  • read_capacity_pct
  • write_capacity_pct
  • latency_hint_ms

The node’s identity (name, address, port, node_type) is preserved. Safe to call while replication is active.

If the quorum policy is Flexible or Expression, the quorum system is rebuilt to reflect the new capacity/latency weights.

§Note

update_peer_metadata does not currently re-run QuorumPolicy::validate(electable_count) after the metadata change. An LP-optimal Expression quorum that was safe before the update may no longer satisfy the intersection property afterwards. Until automatic revalidation lands, deployments using QuorumPolicy::Expression should call quorum_policy().validate(get_rep_group().electable_count()) on the returned RepGroup after every metadata change and fail the operator-facing operation if validation reports unsafety.

Source

pub fn get_rep_group(&self) -> RepGroup

Returns a snapshot of the current replication group as a RepGroup.

The snapshot reflects the state at the time of the call; subsequent add_peer / remove_peer calls are not reflected in it.

Source

pub fn get_config(&self) -> &RepConfig

Get the replication configuration.

Returns the replication configuration that has been used to create this environment.

Source

pub fn get_vlsn_range(&self) -> VlsnRange

Get the current VLSN range on this node.

Returns the range of VLSNs currently available on this node.

Source

pub fn get_current_vlsn(&self) -> u64

Get the latest VLSN.

Returns the most recent VLSN registered on this node.

Source

pub fn replica_stream(&self) -> &ReplicaStream

The replica-side replication stream state (master high-water, applied VLSN, lag). Used by the consistency read-gate to learn the master’s latest known commit VLSN (JE ConsistencyTracker.masterTxnEndVLSN, updated by heartbeats).

Source

pub fn commit_token(&self) -> Option<CommitToken>

REP-10 (B): mint a [CommitToken] for the most recent commit on this master.

Port of MasterTxn.getCommitToken: returns new CommitToken(envUUID, commitVLSN.getSequence()). A client that just performed a write on the master calls this to obtain the token it will hand to a subsequent replica read (Transaction.getCommitToken). Returns None on a non-master or when no commit VLSN exists yet (JE returns null when commitVLSN.isNull).

The token’s VLSN is the master’s latest assigned VLSN — the same wal_vlsn_counter high-water the ack gate keys on (the commit was logged immediately before this call).

Source

pub fn begin_read_consistency( &self, policy_override: Option<&ConsistencyPolicy>, ) -> Result<()>

REP-10 (C): the read-gate. Enforce a replica read-consistency policy before a read transaction proceeds.

Port of ReplicaConsistencyPolicy.ensureConsistency as invoked from a replica beginTransaction (RepImpl.checkConsistency / Replica.getConsistencyTracker().awaitVLSN). Called by the replica env’s transaction-begin / read path.

  • policy_override: a per-transaction policy (JE TransactionConfig.setConsistencyPolicy). When None, the node’s configured default is used (ReplicationConfig.setConsistencyPolicyRepConfig::consistency_policy).

On a master, or when the effective policy is [ConsistencyPolicy::NoConsistency], this returns immediately so existing behaviour is unchanged unless a policy is set. On a replica with a non-NoConsistency policy it BLOCKS until the replica has replayed far enough or the policy timeout expires (a clean RepError, never a hang).

Source

pub fn syncup_with_feeder( &self, feeder: &dyn SyncupView, ) -> Result<SyncupAction>

REP-1 STEP 5 (D): run a live syncup against feeder and, if this replica’s tail diverged, ROLL IT BACK to the common matchpoint instead of falling back to a network restore.

Port of the replica’s side of JE ReplicaFeederSyncup.execute: findMatchpointverifyRollbackreplay.rollbackvlsnIndex.truncateFromTail → resume streaming from matchpoint + 1.

feeder is the master’s crate::stream::syncup::SyncupView (built from its VLSN index, or exchanged over the syncup wire protocol in crate::stream::syncup_protocol). The decision uses the same pure core the protocol drives: find_matchpoint + verify_rollback.

Returns:

  • SyncupAction::RolledBack — the divergent tail was truncated to the matchpoint; resume streaming from start_vlsn. The non-diverged case (matchpoint == last VLSN) returns RolledBack with an empty tail and is a no-op rollback.
  • SyncupAction::NeedsRestoreverify_rollback selected NetworkRestore (no common matchpoint) or HardRecovery (the rollback would cross a committed/aborted txn); the caller must network-restore per JE.

The non-diverged fast path (the replica’s range is a prefix of the feeder’s) is still served by the range-check negotiate_syncup (SyncupResult::CanServe) in the streaming path; this method is the DIVERGED case.

Source

pub fn feeder_replica_names(&self) -> Vec<String>

Return the list of replica names that currently have a Feeder tracker on this (master) node.

Used by tests and operator tooling. The returned list reflects the master’s view at the time of the call; subsequent add_peer/remove_peer calls may change it.

Source

pub fn wal_feeds_served(&self) -> u64

Number of downstream connections this node has served via the JE Feeder/MasterFeederSource mechanism (FeederRunner + EnvironmentLogScanner reading this node’s OWN WAL).

A non-zero value PROVES this node fed a downstream replica by the SAME mechanism the master uses — a cascading replica and the master run the identical PeerFeederServiceFeederRunnerEnvironmentLogScanner path (JE FeederManagerFeederMasterFeederSource). Used by the chained-replication test to assert the cascade does NOT use the in-memory pull fallback.

Source

pub fn register_feeder_channel( &self, replica_name: String, channel: Arc<dyn Channel>, )

Register a channel for pushing log entries to a specific replica.

When Self::become_master is called — or if the node is already master — a FeederRunner background thread is immediately spawned for this channel. The thread reads from a dedicated in-memory queue that is fed by Self::replicate_entry / Self::apply_entry, and sends framed log entries to the replica over channel. Acks sent back by the replica are visible via Self::active_feeder_runner_acked_vlsn.

§Production vs. test use

Production: pass a crate::net::TcpChannel connected to the replica’s inbound feeder service.
Tests: pass one half of a crate::net::LocalChannelPair.

§Note on push vs. pull

Registering a channel activates the push path: the master initiates and owns the feeder connection. The existing pull path (PeerFeederService / catch_up_from_peer) continues to operate in parallel for replicas that connect proactively. Do not register a channel for a replica that already connects via the pull path, or entries may be delivered twice.

If become_master was called before registering the channel, call this method afterward; it will spawn the FeederRunner immediately.

Source

pub fn active_feeder_runner_acked_vlsn(&self, replica_name: &str) -> u64

Return the last VLSN acknowledged by the FeederRunner for replica_name.

Returns 0 if no FeederRunner is currently active for that replica (either become_master was not called yet, or no channel was registered). Use this to poll catch-up progress before shutdown.

Source

pub fn bootstrap_via_dispatcher(&self, peer_name: &str) -> Result<()>

Bootstrap this node’s environment by network-restoring all .ndb files from peer_name via the dispatcher’s RESTORE service.

Closes findings F2 / F4 of the 2026 review.

The standalone NetworkRestore::execute() opens raw TCP and expects to drive the legacy NetworkRestoreServer::start listener. Production replicated environments host the RESTORE handler on the dispatcher, so this method routes through execute_via_dispatcher.

peer_name must be a known peer in GroupService; on success the peer’s .ndb files are written into config.env_home. Returns Err if env_home is None, the peer is unknown, or the restore fails for any reason.

Source

pub fn get_stats(&self) -> &RepStats

Get replication statistics.

Returns statistics associated with this environment.

Source

pub fn get_ack_tracker(&self) -> &AckTracker

Get the ack tracker.

Source

pub fn ensure_unknown_state(&self) -> Result<()>

Ensure the node state machine is in Unknown state, transitioning from Detached if necessary. This is needed because the state machine only allows Detached -> Unknown -> Master/Replica.

Source

pub fn become_master(&self, term: u64) -> Result<()>

Transition to master state.

Transitions this node to Master state for the given election term. As master, the node can accept write operations and feed log entries to replicas.

Active push-feeder (C-C2): if feeder channels have been registered via Self::register_feeder_channel before this call, a FeederRunner background thread is spawned per channel.

WAL-scanner auto-feed path (C-C2b, v3.3.0): when Self::with_environment has been called before become_master, each FeederRunner thread uses an EnvironmentLogScanner as its source. Every log_txn_commit on the master writes a VLSN-tagged 22-byte WAL entry (via LogManager::log_with_vlsn); the scanner discovers these entries and streams them to replicas automatically, without any Self::replicate_entry call from the application.

Fallback path: when no EnvironmentImpl is wired, the runner reads from the in-memory queue populated by Self::replicate_entry / Self::apply_entry.

If no feeder channels are registered, this call registers per-replica Feeder tracker structs for AckTracker bookkeeping only. In that case replicas must connect proactively to the PEER_FEEDER pull service to receive entries.

Source

pub fn become_replica(&self, master_name: &str) -> Result<()>

Transition to replica state with the given master.

Transitions this node to Replica state. The node will receive log entries from the specified master.

If a live EnvironmentImpl has been wired in via with_environment, the method prepares an EnvironmentLogWriter so that replicated entries can be written to the local log. The actual network connection is established by the TcpServiceDispatcher; this method logs intent.

In HA.

Source

pub fn transfer_master(&self, config: MasterTransferConfig) -> Result<()>

Initiate a master transfer to the target node.

Transfers the current master state from this node to one of the electable replicas. The replica that is actually chosen to be the new master is the one with which the Master Transfer can be completed most rapidly. The transfer operation ensures that all changes at this node are available at the new master upon conclusion of the operation.

Source

pub fn register_vlsn(&self, vlsn: u64, file_number: u32, file_offset: u32)

Register a VLSN (as master, after writing a log entry).

Maps the given VLSN to the specified log file position. This is called by the master after it writes a replicated log entry.

Source

pub fn register_vlsn_typed( &self, vlsn: u64, file_number: u32, file_offset: u32, entry_type: LogEntryType, )

Register a VLSN→LSN mapping with its LogEntryType, so lastSync / lastTxnEnd advance (JE VLSNRange.getUpdateForNewMapping). Used by the syncup driver/tests that apply VLSN-tagged entries to a real log and need the sync/commit boundaries to track the stream.

Source

pub fn replicate_entry( &self, vlsn: u64, file_number: u32, file_offset: u32, entry_type: u8, data: Vec<u8>, )

Replicate a freshly committed log entry from the master.

Closes finding F9 of the 2026 review.

Combines register_vlsn with a push into the in-memory peer_scanner so that downstream replicas pulling from this node’s PEER_FEEDER service (via catch_up_from_peer) can stream the entry without round-tripping through the on-disk log. The local log is still the source of truth; the peer scanner is a fast-path cache that bounds itself via PeerLogScanner::with_capacity so old entries are evicted.

Should be called by the master after the local commit has fsynced. Calling on a non-master is harmless (the peer scanner cache is also used by replicas) but is logged at trace level for diagnostics.

Source

pub fn apply_entry( &self, vlsn: u64, entry_type: u8, data: Vec<u8>, ) -> Result<()>

Apply a replicated entry (as replica).

Applies a log entry received from the master. This is called by the replica stream handler after receiving an entry from the feeder.

data is the wire-encoded log-record payload. When the replicated environment has not been wired to a local noxu_db::Environment (i.e., before with_environment is called) the payload is forwarded into the in-memory peer scanner so that downstream replicas attached to the PEER_FEEDER service can re-stream it; the local log is not updated. This is documented behaviour rather than a stub — see the 2026 review finding #26 (medium) for the with_environment-required local-apply path. cleanup (rep info F35: _data placeholder) renames the leading underscore so reviewers don’t read it as a TODO.

Source

pub fn record_ack(&self, vlsn: u64, replica_name: &str)

Record an ack from a replica (as master).

Records that the specified replica has acknowledged processing up to the given VLSN. This is used by the master to track durability guarantees.

Source

pub fn get_dtvlsn(&self) -> u64

Returns the current Durable Transaction VLSN (D7, JE RepNode.getDTVLSN). The highest VLSN replicated to a majority of electable replicas; 0 if none yet. Used by the election ranking so the most-durable node wins.

Source

pub fn update_dtvlsn(&self, candidate: u64) -> u64

Advance the DTVLSN to candidate if it is greater (JE RepNode.updateDTVLSN — an AtomicLongMax.updateMax). The DTVLSN can only move forward. Returns the resulting (possibly unchanged) value.

Source

pub fn set_dtvlsn(&self, vlsn: u64)

Set the DTVLSN from the replication stream (JE RepNode.setDTVLSN — used exclusively by the replica, which maintains the DTVLSN from commit/abort records). Still enforced as advance-only via update_max so an out-of-order or stale record cannot move it backward.

Source

pub fn set_state_change_listener(&self, listener: Arc<dyn StateChangeListener>)

Set the state change listener.

Sets the listener used to receive asynchronous replication node state change events. Note that there is one listener per replication node, not one per handle. Invoking this method adds to the set of listeners.

Invoking this method typically results in an immediate callback to the application via the on_state_change method, so that the application is made aware of the existing state of the node at the time the listener is first established.

Source

pub fn close(&self) -> Result<()>

Close the replicated environment.

Closes this handle and releases any resources. When closed, daemon threads are stopped, even if they are performing work. The node ceases participation in the replication group. If the node was currently the master, the rest of the group will hold an election.

The ReplicatedEnvironment should not be closed while any other type of handle that refers to it is not yet closed.

Source

pub fn shutdown_group(&self, replica_shutdown_timeout_ms: u64) -> Result<()>

Close this handle and shut down the Replication Group by forcing all active Replicas to exit.

This method must be invoked on the node that’s currently the Master after all other outstanding handles have been closed.

When push-feeder threads are active (registered via Self::register_feeder_channel), the master first waits up to half of replica_shutdown_timeout_ms for each FeederRunner replica to acknowledge all outstanding log entries (VLSN catch-up). Replicas that do not catch up within the budget receive a warning; the master proceeds to send SHUTDOWN_GROUP regardless. This closes finding M-4 of the v3.x production-readiness review.

Replicas that are not fed via a registered channel (pull-based PeerFeederService path) are sent SHUTDOWN_GROUP without a VLSN-level catch-up wait — that wait requires per-replica ack tracking which the pull path does not yet provide.

Source

pub fn is_shutdown(&self) -> bool

Check if shutdown is in progress.

Trait Implementations§

Source§

impl ReplicaAckCoordinator for ReplicatedEnvironment

Source§

fn alloc_vlsn_for_recovered_commit(&self, lsn: Lsn) -> u64

X-3: allocate the next VLSN for a recovered XA commit and register lsn in the VLSN index so feeders can stream the commit.

Increments off the current latest VLSN so the new VLSN is strictly monotonically increasing. In a single-node or master-less environment (not master) returns 0 (NULL_VLSN — harmless, the default).

Source§

fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64

R-3: pre-allocate the next commit VLSN WITHOUT registering in the index.

The caller writes the TxnCommit WAL entry with this VLSN embedded, then calls register_recovered_commit_vlsn with the actual commit LSN. This two-step approach ensures the WAL entry carries the VLSN so the X-14 VLSN rebuild on second crash can find it.

Source§

fn register_recovered_commit_vlsn(&self, vlsn: u64, commit_lsn: Lsn)

R-3: register a pre-allocated VLSN in the VLSN index with the actual commit LSN. Called after writing the TxnCommit WAL entry.

Source§

fn await_replica_acks( &self, policy: ReplicaAckPolicyKind, timeout: Duration, ) -> Result<u32, AckWaitError>

Block until at least policy.required_acks(electable_count) replicas have acknowledged the most-recent local commit, or until timeout elapses, whichever comes first. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V