reddb_server/replication/
mod.rs1pub mod bookmark;
24pub mod cascade;
25pub mod cdc;
26pub mod commit_policy;
27pub mod commit_waiter;
28pub mod control_plane;
29pub mod election;
30pub mod failover;
31pub mod fence;
32pub mod flow_control;
33pub mod lease;
34pub mod logical;
35pub mod primary;
36pub mod quorum;
37pub mod replica;
38pub mod rollback;
39pub mod scheduler;
40pub mod swap_db;
41pub mod topology_advertiser;
42pub mod witness;
43
44pub use bookmark::{BookmarkDecodeError, CausalBookmark};
45pub use cascade::{
46 plan_upstream, CascadeRefusal, CascadeRelay, CascadeUpstream, DownstreamSlot, ReplicaClass,
47 UpstreamChoice,
48};
49pub use commit_policy::CommitPolicy;
50pub use commit_waiter::{AwaitOutcome, CommitWaiter};
51pub use control_plane::{
52 ControlPlaneConsensus, ControlPlaneEntry, ControlPlaneEntryKind, ControlPlaneLogIndex,
53 ControlPlanePayload, ControlPlaneRole, MemberId, ProposeRefusal,
54};
55pub use election::{
56 quorum_threshold, randomized_election_timeout, ElectionCoordinator, ElectionOutcome,
57 ElectionRequest, ElectionTransport, FileLastVoteStore, LastVote, LastVoteError, LastVoteStore,
58 Member, MemberKind, MemoryLastVoteStore, RefusalReason, VoteDecision, VoteRequest, Voter,
59 VotingState,
60};
61pub use failover::{
62 FailoverCoordinator, FailoverError, FailoverMode, FailoverNode, FailoverOutcome,
63 FailoverRequest, FailoverTransport, NodeRole, RoleAssignment,
64};
65pub use fence::{
66 FenceBoundary, FenceVerdict, FileTermStore, MemoryTermStore, StaleTermFenced, TermFence,
67 TermStore, TermStoreError,
68};
69pub use flow_control::{Admission, FlowController};
70pub use lease::{LeaseError, LeaseStore, WriterLease};
71pub use quorum::{QuorumConfig, QuorumCoordinator, QuorumError};
72pub use rollback::{
73 DivergentTail, RollbackCoordinator, RollbackError, RollbackEvent, RollbackOutcome,
74 RollbackPlan, RollbackRequest, RollbackTransport, TailRecord,
75};
76pub use swap_db::{RebootstrapInProgress, SwapDb};
77pub use topology_advertiser::{
78 LagConfig, TopologyAdvertiser, TopologyAuthGate, DEFAULT_REPLICA_TIMEOUT_MS,
79 TOPOLOGY_READ_CAPABILITY,
80};
81pub use witness::{RuntimeProfile, WitnessSupervisor};
82
83pub const DEFAULT_REPLICATION_TERM: u64 = reddb_wire::replication::DEFAULT_REPLICATION_TERM;
84pub const DEFAULT_SLOT_RETENTION_MAX_LAG_LSN: u64 = 100_000;
85pub const DEFAULT_SLOT_IDLE_TIMEOUT_MS: u64 = 86_400_000;
86
87#[derive(Debug, Clone, PartialEq, Eq, Default)]
89pub enum ReplicationRole {
90 #[default]
92 Standalone,
93 Primary,
95 Replica {
97 primary_addr: String,
99 },
100}
101
102#[derive(Debug, Clone)]
104pub struct ReplicationConfig {
105 pub role: ReplicationRole,
106 pub term: u64,
108 pub poll_interval_ms: u64,
110 pub max_batch_size: usize,
112 pub region: String,
119 pub quorum: QuorumConfig,
121 pub slot_retention_max_lag_lsn: u64,
124 pub slot_idle_timeout_ms: u64,
126 pub replica_class: ReplicaClass,
132 pub cascade_from: Option<CascadeUpstream>,
136}
137
138impl ReplicationConfig {
139 pub fn standalone() -> Self {
140 Self {
141 role: ReplicationRole::Standalone,
142 term: DEFAULT_REPLICATION_TERM,
143 poll_interval_ms: 100,
144 max_batch_size: 1000,
145 region: "local".to_string(),
146 quorum: QuorumConfig::async_commit(),
147 slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
148 slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
149 replica_class: ReplicaClass::Voting,
150 cascade_from: None,
151 }
152 }
153
154 pub fn primary() -> Self {
155 Self {
156 role: ReplicationRole::Primary,
157 term: DEFAULT_REPLICATION_TERM,
158 poll_interval_ms: 100,
159 max_batch_size: 1000,
160 region: "local".to_string(),
161 quorum: QuorumConfig::async_commit(),
162 slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
163 slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
164 replica_class: ReplicaClass::Voting,
165 cascade_from: None,
166 }
167 }
168
169 pub fn replica(primary_addr: impl Into<String>) -> Self {
170 Self {
171 role: ReplicationRole::Replica {
172 primary_addr: primary_addr.into(),
173 },
174 term: DEFAULT_REPLICATION_TERM,
175 poll_interval_ms: 100,
176 max_batch_size: 1000,
177 region: "local".to_string(),
178 quorum: QuorumConfig::async_commit(),
179 slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
180 slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
181 replica_class: ReplicaClass::Voting,
182 cascade_from: None,
183 }
184 }
185
186 pub fn with_quorum(mut self, quorum: QuorumConfig) -> Self {
188 self.quorum = quorum;
189 self
190 }
191
192 pub fn with_region(mut self, region: impl Into<String>) -> Self {
194 self.region = region.into();
195 self
196 }
197
198 pub fn with_term(mut self, term: u64) -> Self {
200 self.term = term;
201 self
202 }
203
204 pub fn with_slot_retention_max_lag_lsn(mut self, max_lag_lsn: u64) -> Self {
205 self.slot_retention_max_lag_lsn = max_lag_lsn;
206 self
207 }
208
209 pub fn with_slot_idle_timeout_ms(mut self, timeout_ms: u64) -> Self {
210 self.slot_idle_timeout_ms = timeout_ms;
211 self
212 }
213
214 pub fn with_replica_class(mut self, class: ReplicaClass) -> Self {
216 self.replica_class = class;
217 self
218 }
219
220 pub fn cascading_from(mut self, node_id: impl Into<String>, addr: impl Into<String>) -> Self {
226 self.replica_class = ReplicaClass::AsyncReadReplica;
227 self.cascade_from = Some(CascadeUpstream::new(node_id, addr));
228 self
229 }
230
231 pub fn resolved_upstream(
238 &self,
239 self_node_id: &str,
240 ) -> (UpstreamChoice, Option<CascadeRefusal>) {
241 plan_upstream(self_node_id, self.replica_class, self.cascade_from.as_ref())
242 }
243}