Skip to main content

reddb_server/replication/
mod.rs

1//! Replication Module
2//!
3//! Implements single-primary, multi-replica replication via WAL streaming.
4//!
5//! # Architecture
6//!
7//! - Primary: accepts writes and streams WAL records to replicas
8//! - Replica: read-only, connects to primary for WAL streaming
9//! - Initial sync via snapshot transfer, then incremental WAL
10//!
11//! # Usage
12//!
13//! ```ignore
14//! // Primary
15//! let options = RedDBOptions::persistent("./primary-data")
16//!     .with_replication(ReplicationConfig::primary());
17//!
18//! // Replica
19//! let options = RedDBOptions::persistent("./replica-data")
20//!     .with_replication(ReplicationConfig::replica("http://primary:50051"));
21//! ```
22
23pub mod bookmark;
24pub mod cascade;
25pub mod cdc;
26pub mod commit_policy;
27pub mod commit_waiter;
28pub mod control_plane;
29pub mod election;
30pub mod failover;
31pub mod fence;
32pub mod flow_control;
33pub mod lease;
34pub mod logical;
35pub mod primary;
36pub mod quorum;
37pub mod replica;
38pub mod rollback;
39pub mod scheduler;
40pub mod swap_db;
41pub mod topology_advertiser;
42pub mod witness;
43
44pub use bookmark::{BookmarkDecodeError, CausalBookmark};
45pub use cascade::{
46    plan_upstream, CascadeRefusal, CascadeRelay, CascadeUpstream, DownstreamSlot, ReplicaClass,
47    UpstreamChoice,
48};
49pub use commit_policy::CommitPolicy;
50pub use commit_waiter::{AwaitOutcome, CommitWaiter};
51pub use control_plane::{
52    ControlPlaneConsensus, ControlPlaneEntry, ControlPlaneEntryKind, ControlPlaneLogIndex,
53    ControlPlanePayload, ControlPlaneRole, MemberId, ProposeRefusal,
54};
55pub use election::{
56    quorum_threshold, randomized_election_timeout, ElectionCoordinator, ElectionOutcome,
57    ElectionRequest, ElectionTransport, FileLastVoteStore, LastVote, LastVoteError, LastVoteStore,
58    Member, MemberKind, MemoryLastVoteStore, RefusalReason, VoteDecision, VoteRequest, Voter,
59    VotingState,
60};
61pub use failover::{
62    FailoverCoordinator, FailoverError, FailoverMode, FailoverNode, FailoverOutcome,
63    FailoverRequest, FailoverTransport, NodeRole, RoleAssignment,
64};
65pub use fence::{
66    FenceBoundary, FenceVerdict, FileTermStore, MemoryTermStore, StaleTermFenced, TermFence,
67    TermStore, TermStoreError,
68};
69pub use flow_control::{Admission, FlowController};
70pub use lease::{LeaseError, LeaseStore, WriterLease};
71pub use quorum::{QuorumConfig, QuorumCoordinator, QuorumError};
72pub use rollback::{
73    DivergentTail, RollbackCoordinator, RollbackError, RollbackEvent, RollbackOutcome,
74    RollbackPlan, RollbackRequest, RollbackTransport, TailRecord,
75};
76pub use swap_db::{RebootstrapInProgress, SwapDb};
77pub use topology_advertiser::{
78    LagConfig, TopologyAdvertiser, TopologyAuthGate, DEFAULT_REPLICA_TIMEOUT_MS,
79    TOPOLOGY_READ_CAPABILITY,
80};
81pub use witness::{RuntimeProfile, WitnessSupervisor};
82
83pub const DEFAULT_REPLICATION_TERM: u64 = reddb_wire::replication::DEFAULT_REPLICATION_TERM;
84pub const DEFAULT_SLOT_RETENTION_MAX_LAG_LSN: u64 = 100_000;
85pub const DEFAULT_SLOT_IDLE_TIMEOUT_MS: u64 = 86_400_000;
86
87/// Role of this RedDB instance in a replication cluster.
88#[derive(Debug, Clone, PartialEq, Eq, Default)]
89pub enum ReplicationRole {
90    /// Standalone instance (default, no replication).
91    #[default]
92    Standalone,
93    /// Primary: accepts reads and writes, streams WAL to replicas.
94    Primary,
95    /// Replica: read-only, receives WAL from primary.
96    Replica {
97        /// gRPC address of the primary (e.g., "http://primary:50051")
98        primary_addr: String,
99    },
100}
101
102/// Configuration for replication.
103#[derive(Debug, Clone)]
104pub struct ReplicationConfig {
105    pub role: ReplicationRole,
106    /// Current replication term/epoch stamped into WAL-derived records.
107    pub term: u64,
108    /// How often replica polls for new WAL records (milliseconds).
109    pub poll_interval_ms: u64,
110    /// Maximum batch size for WAL record transfer.
111    pub max_batch_size: usize,
112    /// Region identifier for this instance (Phase 2.6 multi-region).
113    ///
114    /// Used by the quorum coordinator to spread write acks across
115    /// fault domains: `QuorumConfig::required_regions` forces a commit
116    /// to wait until at least one replica in each listed region has
117    /// acked. Defaults to `"local"` for single-region deployments.
118    pub region: String,
119    /// Quorum configuration (Phase 2.6 multi-region).
120    pub quorum: QuorumConfig,
121    /// Maximum LSN lag a replication slot may pin before the primary
122    /// invalidates it and allows WAL pruning to continue.
123    pub slot_retention_max_lag_lsn: u64,
124    /// Maximum wall-clock idle age for a slot before invalidation.
125    pub slot_idle_timeout_ms: u64,
126    /// Streaming class (issue #838). A [`ReplicaClass::Voting`] node is on the
127    /// durability/election path and always streams directly from the primary;
128    /// a [`ReplicaClass::AsyncReadReplica`] may cascade from an intermediate.
129    /// Defaults to `Voting` — a node only cascades when explicitly declared a
130    /// read-replica.
131    pub replica_class: ReplicaClass,
132    /// Optional intermediate replica to cascade from (issue #838). Honoured
133    /// only for an async read-replica; a voting member refuses it and streams
134    /// directly from the primary. See [`ReplicationConfig::resolved_upstream`].
135    pub cascade_from: Option<CascadeUpstream>,
136}
137
138impl ReplicationConfig {
139    pub fn standalone() -> Self {
140        Self {
141            role: ReplicationRole::Standalone,
142            term: DEFAULT_REPLICATION_TERM,
143            poll_interval_ms: 100,
144            max_batch_size: 1000,
145            region: "local".to_string(),
146            quorum: QuorumConfig::async_commit(),
147            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
148            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
149            replica_class: ReplicaClass::Voting,
150            cascade_from: None,
151        }
152    }
153
154    pub fn primary() -> Self {
155        Self {
156            role: ReplicationRole::Primary,
157            term: DEFAULT_REPLICATION_TERM,
158            poll_interval_ms: 100,
159            max_batch_size: 1000,
160            region: "local".to_string(),
161            quorum: QuorumConfig::async_commit(),
162            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
163            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
164            replica_class: ReplicaClass::Voting,
165            cascade_from: None,
166        }
167    }
168
169    pub fn replica(primary_addr: impl Into<String>) -> Self {
170        Self {
171            role: ReplicationRole::Replica {
172                primary_addr: primary_addr.into(),
173            },
174            term: DEFAULT_REPLICATION_TERM,
175            poll_interval_ms: 100,
176            max_batch_size: 1000,
177            region: "local".to_string(),
178            quorum: QuorumConfig::async_commit(),
179            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
180            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
181            replica_class: ReplicaClass::Voting,
182            cascade_from: None,
183        }
184    }
185
186    /// Attach a quorum configuration (fluent setter).
187    pub fn with_quorum(mut self, quorum: QuorumConfig) -> Self {
188        self.quorum = quorum;
189        self
190    }
191
192    /// Set the region identifier (fluent setter).
193    pub fn with_region(mut self, region: impl Into<String>) -> Self {
194        self.region = region.into();
195        self
196    }
197
198    /// Set the replication term stamped into newly produced records.
199    pub fn with_term(mut self, term: u64) -> Self {
200        self.term = term;
201        self
202    }
203
204    pub fn with_slot_retention_max_lag_lsn(mut self, max_lag_lsn: u64) -> Self {
205        self.slot_retention_max_lag_lsn = max_lag_lsn;
206        self
207    }
208
209    pub fn with_slot_idle_timeout_ms(mut self, timeout_ms: u64) -> Self {
210        self.slot_idle_timeout_ms = timeout_ms;
211        self
212    }
213
214    /// Set the streaming class explicitly (issue #838).
215    pub fn with_replica_class(mut self, class: ReplicaClass) -> Self {
216        self.replica_class = class;
217        self
218    }
219
220    /// Declare this node an async read-replica that cascades from `intermediate`
221    /// (issue #838). Sets [`ReplicaClass::AsyncReadReplica`] and the cascade
222    /// source together — the only combination that actually streams from an
223    /// intermediate. A node left at the default `Voting` class ignores any
224    /// cascade source and streams directly from the primary.
225    pub fn cascading_from(mut self, node_id: impl Into<String>, addr: impl Into<String>) -> Self {
226        self.replica_class = ReplicaClass::AsyncReadReplica;
227        self.cascade_from = Some(CascadeUpstream::new(node_id, addr));
228        self
229    }
230
231    /// Resolve where this node should open its WAL stream, applying the
232    /// cascade policy (issue #838). A voting member always resolves to the
233    /// primary even when a cascade source is configured; the returned
234    /// [`CascadeRefusal`] explains any fallback so it is observable rather
235    /// than silent. `self_node_id` guards against a node cascading from its
236    /// own slot.
237    pub fn resolved_upstream(
238        &self,
239        self_node_id: &str,
240    ) -> (UpstreamChoice, Option<CascadeRefusal>) {
241        plan_upstream(self_node_id, self.replica_class, self.cascade_from.as_ref())
242    }
243}