Skip to main content

reddb_server/replication/
mod.rs

1//! Replication Module
2//!
3//! Implements single-primary, multi-replica replication via WAL streaming.
4//!
5//! # Architecture
6//!
7//! - Primary: accepts writes and streams WAL records to replicas
8//! - Replica: read-only, connects to primary for WAL streaming
9//! - Initial sync via snapshot transfer, then incremental WAL
10//!
11//! # Usage
12//!
13//! ```ignore
14//! // Primary
15//! let options = RedDBOptions::persistent("./primary-data")
16//!     .with_replication(ReplicationConfig::primary());
17//!
18//! // Replica
19//! let options = RedDBOptions::persistent("./replica-data")
20//!     .with_replication(ReplicationConfig::replica("http://primary:50051"));
21//! ```
22
23pub mod bookmark;
24pub mod cascade;
25pub mod cdc;
26pub mod commit_policy;
27pub mod commit_waiter;
28pub mod election;
29pub mod failover;
30pub mod fence;
31pub mod flow_control;
32pub mod lease;
33pub mod logical;
34pub mod primary;
35pub mod quorum;
36pub mod replica;
37pub mod rollback;
38pub mod scheduler;
39pub mod swap_db;
40pub mod topology_advertiser;
41pub mod witness;
42
43pub use bookmark::{BookmarkDecodeError, CausalBookmark};
44pub use cascade::{
45    plan_upstream, CascadeRefusal, CascadeRelay, CascadeUpstream, DownstreamSlot, ReplicaClass,
46    UpstreamChoice,
47};
48pub use commit_policy::CommitPolicy;
49pub use commit_waiter::{AwaitOutcome, CommitWaiter};
50pub use election::{
51    quorum_threshold, randomized_election_timeout, ElectionCoordinator, ElectionOutcome,
52    ElectionRequest, ElectionTransport, FileLastVoteStore, LastVote, LastVoteError, LastVoteStore,
53    Member, MemberKind, MemoryLastVoteStore, RefusalReason, VoteDecision, VoteRequest, Voter,
54    VotingState,
55};
56pub use failover::{
57    FailoverCoordinator, FailoverError, FailoverMode, FailoverNode, FailoverOutcome,
58    FailoverRequest, FailoverTransport, NodeRole, RoleAssignment,
59};
60pub use fence::{
61    FenceBoundary, FenceVerdict, FileTermStore, MemoryTermStore, StaleTermFenced, TermFence,
62    TermStore, TermStoreError,
63};
64pub use flow_control::{Admission, FlowController};
65pub use lease::{LeaseError, LeaseStore, WriterLease};
66pub use quorum::{QuorumConfig, QuorumCoordinator, QuorumError};
67pub use rollback::{
68    DivergentTail, RollbackCoordinator, RollbackError, RollbackEvent, RollbackOutcome,
69    RollbackPlan, RollbackRequest, RollbackTransport, TailRecord,
70};
71pub use swap_db::{RebootstrapInProgress, SwapDb};
72pub use topology_advertiser::{
73    LagConfig, TopologyAdvertiser, TopologyAuthGate, DEFAULT_REPLICA_TIMEOUT_MS,
74    TOPOLOGY_READ_CAPABILITY,
75};
76pub use witness::{RuntimeProfile, WitnessSupervisor};
77
78pub const DEFAULT_REPLICATION_TERM: u64 = 1;
79pub const DEFAULT_SLOT_RETENTION_MAX_LAG_LSN: u64 = 100_000;
80pub const DEFAULT_SLOT_IDLE_TIMEOUT_MS: u64 = 86_400_000;
81
82/// Role of this RedDB instance in a replication cluster.
83#[derive(Debug, Clone, PartialEq, Eq, Default)]
84pub enum ReplicationRole {
85    /// Standalone instance (default, no replication).
86    #[default]
87    Standalone,
88    /// Primary: accepts reads and writes, streams WAL to replicas.
89    Primary,
90    /// Replica: read-only, receives WAL from primary.
91    Replica {
92        /// gRPC address of the primary (e.g., "http://primary:50051")
93        primary_addr: String,
94    },
95}
96
97/// Configuration for replication.
98#[derive(Debug, Clone)]
99pub struct ReplicationConfig {
100    pub role: ReplicationRole,
101    /// Current replication term/epoch stamped into WAL-derived records.
102    pub term: u64,
103    /// How often replica polls for new WAL records (milliseconds).
104    pub poll_interval_ms: u64,
105    /// Maximum batch size for WAL record transfer.
106    pub max_batch_size: usize,
107    /// Region identifier for this instance (Phase 2.6 multi-region).
108    ///
109    /// Used by the quorum coordinator to spread write acks across
110    /// fault domains: `QuorumConfig::required_regions` forces a commit
111    /// to wait until at least one replica in each listed region has
112    /// acked. Defaults to `"local"` for single-region deployments.
113    pub region: String,
114    /// Quorum configuration (Phase 2.6 multi-region).
115    pub quorum: QuorumConfig,
116    /// Maximum LSN lag a replication slot may pin before the primary
117    /// invalidates it and allows WAL pruning to continue.
118    pub slot_retention_max_lag_lsn: u64,
119    /// Maximum wall-clock idle age for a slot before invalidation.
120    pub slot_idle_timeout_ms: u64,
121    /// Streaming class (issue #838). A [`ReplicaClass::Voting`] node is on the
122    /// durability/election path and always streams directly from the primary;
123    /// a [`ReplicaClass::AsyncReadReplica`] may cascade from an intermediate.
124    /// Defaults to `Voting` — a node only cascades when explicitly declared a
125    /// read-replica.
126    pub replica_class: ReplicaClass,
127    /// Optional intermediate replica to cascade from (issue #838). Honoured
128    /// only for an async read-replica; a voting member refuses it and streams
129    /// directly from the primary. See [`ReplicationConfig::resolved_upstream`].
130    pub cascade_from: Option<CascadeUpstream>,
131}
132
133impl ReplicationConfig {
134    pub fn standalone() -> Self {
135        Self {
136            role: ReplicationRole::Standalone,
137            term: DEFAULT_REPLICATION_TERM,
138            poll_interval_ms: 100,
139            max_batch_size: 1000,
140            region: "local".to_string(),
141            quorum: QuorumConfig::async_commit(),
142            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
143            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
144            replica_class: ReplicaClass::Voting,
145            cascade_from: None,
146        }
147    }
148
149    pub fn primary() -> Self {
150        Self {
151            role: ReplicationRole::Primary,
152            term: DEFAULT_REPLICATION_TERM,
153            poll_interval_ms: 100,
154            max_batch_size: 1000,
155            region: "local".to_string(),
156            quorum: QuorumConfig::async_commit(),
157            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
158            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
159            replica_class: ReplicaClass::Voting,
160            cascade_from: None,
161        }
162    }
163
164    pub fn replica(primary_addr: impl Into<String>) -> Self {
165        Self {
166            role: ReplicationRole::Replica {
167                primary_addr: primary_addr.into(),
168            },
169            term: DEFAULT_REPLICATION_TERM,
170            poll_interval_ms: 100,
171            max_batch_size: 1000,
172            region: "local".to_string(),
173            quorum: QuorumConfig::async_commit(),
174            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
175            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
176            replica_class: ReplicaClass::Voting,
177            cascade_from: None,
178        }
179    }
180
181    /// Attach a quorum configuration (fluent setter).
182    pub fn with_quorum(mut self, quorum: QuorumConfig) -> Self {
183        self.quorum = quorum;
184        self
185    }
186
187    /// Set the region identifier (fluent setter).
188    pub fn with_region(mut self, region: impl Into<String>) -> Self {
189        self.region = region.into();
190        self
191    }
192
193    /// Set the replication term stamped into newly produced records.
194    pub fn with_term(mut self, term: u64) -> Self {
195        self.term = term;
196        self
197    }
198
199    pub fn with_slot_retention_max_lag_lsn(mut self, max_lag_lsn: u64) -> Self {
200        self.slot_retention_max_lag_lsn = max_lag_lsn;
201        self
202    }
203
204    pub fn with_slot_idle_timeout_ms(mut self, timeout_ms: u64) -> Self {
205        self.slot_idle_timeout_ms = timeout_ms;
206        self
207    }
208
209    /// Set the streaming class explicitly (issue #838).
210    pub fn with_replica_class(mut self, class: ReplicaClass) -> Self {
211        self.replica_class = class;
212        self
213    }
214
215    /// Declare this node an async read-replica that cascades from `intermediate`
216    /// (issue #838). Sets [`ReplicaClass::AsyncReadReplica`] and the cascade
217    /// source together — the only combination that actually streams from an
218    /// intermediate. A node left at the default `Voting` class ignores any
219    /// cascade source and streams directly from the primary.
220    pub fn cascading_from(mut self, node_id: impl Into<String>, addr: impl Into<String>) -> Self {
221        self.replica_class = ReplicaClass::AsyncReadReplica;
222        self.cascade_from = Some(CascadeUpstream::new(node_id, addr));
223        self
224    }
225
226    /// Resolve where this node should open its WAL stream, applying the
227    /// cascade policy (issue #838). A voting member always resolves to the
228    /// primary even when a cascade source is configured; the returned
229    /// [`CascadeRefusal`] explains any fallback so it is observable rather
230    /// than silent. `self_node_id` guards against a node cascading from its
231    /// own slot.
232    pub fn resolved_upstream(
233        &self,
234        self_node_id: &str,
235    ) -> (UpstreamChoice, Option<CascadeRefusal>) {
236        plan_upstream(self_node_id, self.replica_class, self.cascade_from.as_ref())
237    }
238}