Skip to main content

reddb_server/replication/
mod.rs

1//! Replication Module
2//!
3//! Implements single-primary, multi-replica replication via WAL streaming.
4//!
5//! # Architecture
6//!
7//! - Primary: accepts writes and streams WAL records to replicas
8//! - Replica: read-only, connects to primary for WAL streaming
9//! - Initial sync via snapshot transfer, then incremental WAL
10//!
11//! # Usage
12//!
13//! ```ignore
14//! // Primary
15//! let options = RedDBOptions::persistent("./primary-data")
16//!     .with_replication(ReplicationConfig::primary());
17//!
18//! // Replica
19//! let options = RedDBOptions::persistent("./replica-data")
20//!     .with_replication(ReplicationConfig::replica("http://primary:50051"));
21//! ```
22
23pub mod bookmark;
24pub mod cdc;
25pub mod commit_policy;
26pub mod commit_waiter;
27pub mod failover;
28pub mod flow_control;
29pub mod lease;
30pub mod logical;
31pub mod primary;
32pub mod quorum;
33pub mod replica;
34pub mod scheduler;
35pub mod swap_db;
36pub mod topology_advertiser;
37
38pub use bookmark::{BookmarkDecodeError, CausalBookmark};
39pub use commit_policy::CommitPolicy;
40pub use commit_waiter::{AwaitOutcome, CommitWaiter};
41pub use failover::{
42    FailoverCoordinator, FailoverError, FailoverMode, FailoverNode, FailoverOutcome,
43    FailoverRequest, FailoverTransport, NodeRole, RoleAssignment,
44};
45pub use flow_control::{Admission, FlowController};
46pub use lease::{LeaseError, LeaseStore, WriterLease};
47pub use quorum::{QuorumConfig, QuorumCoordinator, QuorumError};
48pub use swap_db::{RebootstrapInProgress, SwapDb};
49pub use topology_advertiser::{
50    LagConfig, TopologyAdvertiser, TopologyAuthGate, DEFAULT_REPLICA_TIMEOUT_MS,
51    TOPOLOGY_READ_CAPABILITY,
52};
53
54pub const DEFAULT_REPLICATION_TERM: u64 = 1;
55pub const DEFAULT_SLOT_RETENTION_MAX_LAG_LSN: u64 = 100_000;
56pub const DEFAULT_SLOT_IDLE_TIMEOUT_MS: u64 = 86_400_000;
57
58/// Role of this RedDB instance in a replication cluster.
59#[derive(Debug, Clone, PartialEq, Eq, Default)]
60pub enum ReplicationRole {
61    /// Standalone instance (default, no replication).
62    #[default]
63    Standalone,
64    /// Primary: accepts reads and writes, streams WAL to replicas.
65    Primary,
66    /// Replica: read-only, receives WAL from primary.
67    Replica {
68        /// gRPC address of the primary (e.g., "http://primary:50051")
69        primary_addr: String,
70    },
71}
72
73/// Configuration for replication.
74#[derive(Debug, Clone)]
75pub struct ReplicationConfig {
76    pub role: ReplicationRole,
77    /// Current replication term/epoch stamped into WAL-derived records.
78    pub term: u64,
79    /// How often replica polls for new WAL records (milliseconds).
80    pub poll_interval_ms: u64,
81    /// Maximum batch size for WAL record transfer.
82    pub max_batch_size: usize,
83    /// Region identifier for this instance (Phase 2.6 multi-region).
84    ///
85    /// Used by the quorum coordinator to spread write acks across
86    /// fault domains: `QuorumConfig::required_regions` forces a commit
87    /// to wait until at least one replica in each listed region has
88    /// acked. Defaults to `"local"` for single-region deployments.
89    pub region: String,
90    /// Quorum configuration (Phase 2.6 multi-region).
91    pub quorum: QuorumConfig,
92    /// Maximum LSN lag a replication slot may pin before the primary
93    /// invalidates it and allows WAL pruning to continue.
94    pub slot_retention_max_lag_lsn: u64,
95    /// Maximum wall-clock idle age for a slot before invalidation.
96    pub slot_idle_timeout_ms: u64,
97}
98
99impl ReplicationConfig {
100    pub fn standalone() -> Self {
101        Self {
102            role: ReplicationRole::Standalone,
103            term: DEFAULT_REPLICATION_TERM,
104            poll_interval_ms: 100,
105            max_batch_size: 1000,
106            region: "local".to_string(),
107            quorum: QuorumConfig::async_commit(),
108            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
109            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
110        }
111    }
112
113    pub fn primary() -> Self {
114        Self {
115            role: ReplicationRole::Primary,
116            term: DEFAULT_REPLICATION_TERM,
117            poll_interval_ms: 100,
118            max_batch_size: 1000,
119            region: "local".to_string(),
120            quorum: QuorumConfig::async_commit(),
121            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
122            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
123        }
124    }
125
126    pub fn replica(primary_addr: impl Into<String>) -> Self {
127        Self {
128            role: ReplicationRole::Replica {
129                primary_addr: primary_addr.into(),
130            },
131            term: DEFAULT_REPLICATION_TERM,
132            poll_interval_ms: 100,
133            max_batch_size: 1000,
134            region: "local".to_string(),
135            quorum: QuorumConfig::async_commit(),
136            slot_retention_max_lag_lsn: DEFAULT_SLOT_RETENTION_MAX_LAG_LSN,
137            slot_idle_timeout_ms: DEFAULT_SLOT_IDLE_TIMEOUT_MS,
138        }
139    }
140
141    /// Attach a quorum configuration (fluent setter).
142    pub fn with_quorum(mut self, quorum: QuorumConfig) -> Self {
143        self.quorum = quorum;
144        self
145    }
146
147    /// Set the region identifier (fluent setter).
148    pub fn with_region(mut self, region: impl Into<String>) -> Self {
149        self.region = region.into();
150        self
151    }
152
153    /// Set the replication term stamped into newly produced records.
154    pub fn with_term(mut self, term: u64) -> Self {
155        self.term = term;
156        self
157    }
158
159    pub fn with_slot_retention_max_lag_lsn(mut self, max_lag_lsn: u64) -> Self {
160        self.slot_retention_max_lag_lsn = max_lag_lsn;
161        self
162    }
163
164    pub fn with_slot_idle_timeout_ms(mut self, timeout_ms: u64) -> Self {
165        self.slot_idle_timeout_ms = timeout_ms;
166        self
167    }
168}