reddb_server/replication/
mod.rs1pub mod bookmark;
24pub mod cascade;
25pub mod cdc;
26pub mod commit_policy;
27pub mod commit_waiter;
28pub mod election;
29pub mod failover;
30pub mod fence;
31pub mod flow_control;
32pub mod lease;
33pub mod logical;
34pub mod primary;
35pub mod quorum;
36pub mod replica;
37pub mod rollback;
38pub mod scheduler;
39pub mod swap_db;
40pub mod topology_advertiser;
41pub mod witness;
42
43pub use bookmark::{BookmarkDecodeError, CausalBookmark};
44pub use cascade::{
45 plan_upstream, CascadeRefusal, CascadeRelay, CascadeUpstream, DownstreamSlot, ReplicaClass,
46 UpstreamChoice,
47};
48pub use commit_policy::CommitPolicy;
49pub use commit_waiter::{AwaitOutcome, CommitWaiter};
50pub use election::{
51 quorum_threshold, randomized_election_timeout, ElectionCoordinator, ElectionOutcome,
52 ElectionRequest, ElectionTransport, FileLastVoteStore, LastVote, LastVoteError, LastVoteStore,
53 Member, MemberKind, MemoryLastVoteStore, RefusalReason, VoteDecision, VoteRequest, Voter,
54 VotingState,
55};
56pub use failover::{
57 FailoverCoordinator, FailoverError, FailoverMode, FailoverNode, FailoverOutcome,
58 FailoverRequest, FailoverTransport, NodeRole, RoleAssignment,
59};
60pub use fence::{
61 FenceBoundary, FenceVerdict, FileTermStore, MemoryTermStore, StaleTermFenced, TermFence,
62 TermStore, TermStoreError,
63};
64pub use flow_control::{Admission, FlowController};
65pub use lease::{LeaseError, LeaseStore, WriterLease};
66pub use quorum::{QuorumConfig, QuorumCoordinator, QuorumError};
67pub use rollback::{
68 DivergentTail, RollbackCoordinator, RollbackError, RollbackEvent, RollbackOutcome,
69 RollbackPlan, RollbackRequest, RollbackTransport, TailRecord,
70};
71pub use swap_db::{RebootstrapInProgress, SwapDb};
72pub use topology_advertiser::{
73 LagConfig, TopologyAdvertiser, TopologyAuthGate, DEFAULT_REPLICA_TIMEOUT_MS,
74 TOPOLOGY_READ_CAPABILITY,
75};
76pub use witness::{RuntimeProfile, WitnessSupervisor};
77
78pub const DEFAULT_REPLICATION_TERM: u64 = 1;
79pub const DEFAULT_SLOT_RETENTION_MAX_LAG_LSN: u64 = 100_000;
80pub const DEFAULT_SLOT_IDLE_TIMEOUT_MS: u64 = 86_400_000;
81
82#[derive(Debug, Clone, PartialEq, Eq, Default)]
84pub enum ReplicationRole {
85 #[default]
87 Standalone,
88 Primary,
90 Replica {
92 primary_addr: String,
94 },
95}
96
97#[derive(Debug, Clone)]
99pub struct ReplicationConfig {
100 pub role: ReplicationRole,
101 pub term: u64,
103 pub poll_interval_ms: u64,
105 pub max_batch_size: usize,
107 pub region: String,
114 pub quorum: QuorumConfig,
116 pub slot_retention_max_lag_lsn: u64,
119 pub slot_idle_timeout_ms: u64,
121 pub replica_class: ReplicaClass,
127 pub cascade_from: Option<CascadeUpstream>,
131}
132
133impl ReplicationConfig {
134 pub fn standalone() -> Self {
135 Self {
136 role: ReplicationRole::Standalone,
137 term: DEFAULT_REPLICATION_TERM,
138 poll_interval_ms: 100,
139 max_batch_size: 1000,
140 region: "local".to_string(),
141 quorum: QuorumConfig::async_commit(),
142 slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
143 slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
144 replica_class: ReplicaClass::Voting,
145 cascade_from: None,
146 }
147 }
148
149 pub fn primary() -> Self {
150 Self {
151 role: ReplicationRole::Primary,
152 term: DEFAULT_REPLICATION_TERM,
153 poll_interval_ms: 100,
154 max_batch_size: 1000,
155 region: "local".to_string(),
156 quorum: QuorumConfig::async_commit(),
157 slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
158 slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
159 replica_class: ReplicaClass::Voting,
160 cascade_from: None,
161 }
162 }
163
164 pub fn replica(primary_addr: impl Into<String>) -> Self {
165 Self {
166 role: ReplicationRole::Replica {
167 primary_addr: primary_addr.into(),
168 },
169 term: DEFAULT_REPLICATION_TERM,
170 poll_interval_ms: 100,
171 max_batch_size: 1000,
172 region: "local".to_string(),
173 quorum: QuorumConfig::async_commit(),
174 slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
175 slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
176 replica_class: ReplicaClass::Voting,
177 cascade_from: None,
178 }
179 }
180
181 pub fn with_quorum(mut self, quorum: QuorumConfig) -> Self {
183 self.quorum = quorum;
184 self
185 }
186
187 pub fn with_region(mut self, region: impl Into<String>) -> Self {
189 self.region = region.into();
190 self
191 }
192
193 pub fn with_term(mut self, term: u64) -> Self {
195 self.term = term;
196 self
197 }
198
199 pub fn with_slot_retention_max_lag_lsn(mut self, max_lag_lsn: u64) -> Self {
200 self.slot_retention_max_lag_lsn = max_lag_lsn;
201 self
202 }
203
204 pub fn with_slot_idle_timeout_ms(mut self, timeout_ms: u64) -> Self {
205 self.slot_idle_timeout_ms = timeout_ms;
206 self
207 }
208
209 pub fn with_replica_class(mut self, class: ReplicaClass) -> Self {
211 self.replica_class = class;
212 self
213 }
214
215 pub fn cascading_from(mut self, node_id: impl Into<String>, addr: impl Into<String>) -> Self {
221 self.replica_class = ReplicaClass::AsyncReadReplica;
222 self.cascade_from = Some(CascadeUpstream::new(node_id, addr));
223 self
224 }
225
226 pub fn resolved_upstream(
233 &self,
234 self_node_id: &str,
235 ) -> (UpstreamChoice, Option<CascadeRefusal>) {
236 plan_upstream(self_node_id, self.replica_class, self.cascade_from.as_ref())
237 }
238}