ormdb_server/replication/
manager.rs1use std::sync::Arc;
4
5use tokio::sync::RwLock;
6use tracing::{debug, info};
7
8use ormdb_core::replication::{ChangeLog, ReplicaApplier};
9use ormdb_proto::replication::{ReplicationRole, ReplicationStatus, StreamChangesResponse};
10
11pub struct ReplicationManager {
19 role: RwLock<ReplicationRole>,
21 changelog: Arc<ChangeLog>,
23 applier: Option<Arc<ReplicaApplier>>,
25}
26
27impl ReplicationManager {
28 pub fn new_primary(changelog: Arc<ChangeLog>) -> Self {
30 info!("Initializing replication manager in primary mode");
31 Self {
32 role: RwLock::new(ReplicationRole::Primary),
33 changelog,
34 applier: None,
35 }
36 }
37
38 pub fn new_standalone(changelog: Arc<ChangeLog>) -> Self {
40 info!("Initializing replication manager in standalone mode");
41 Self {
42 role: RwLock::new(ReplicationRole::Standalone),
43 changelog,
44 applier: None,
45 }
46 }
47
48 pub fn new_replica(
50 changelog: Arc<ChangeLog>,
51 applier: Arc<ReplicaApplier>,
52 primary_addr: String,
53 ) -> Self {
54 info!(primary = %primary_addr, "Initializing replication manager in replica mode");
55 Self {
56 role: RwLock::new(ReplicationRole::Replica { primary_addr }),
57 changelog,
58 applier: Some(applier),
59 }
60 }
61
62 pub async fn can_write(&self) -> bool {
64 let role = self.role.read().await;
65 role.can_write()
66 }
67
68 pub async fn role(&self) -> ReplicationRole {
70 self.role.read().await.clone()
71 }
72
73 pub async fn is_replica(&self) -> bool {
75 let role = self.role.read().await;
76 role.is_replica()
77 }
78
79 pub async fn is_primary(&self) -> bool {
81 let role = self.role.read().await;
82 role.is_primary()
83 }
84
85 pub async fn status(&self) -> ReplicationStatus {
87 let role = self.role.read().await.clone();
88 let current_lsn = self.changelog.current_lsn();
89
90 let (lag_entries, lag_ms) = match &role {
91 ReplicationRole::Replica { .. } => {
92 if let Some(ref applier) = self.applier {
93 let applied = applier.applied_lsn();
94 let lag = current_lsn.saturating_sub(applied);
95 (lag, 0)
97 } else {
98 (0, 0)
99 }
100 }
101 _ => (0, 0),
102 };
103
104 ReplicationStatus {
105 role,
106 current_lsn,
107 lag_entries,
108 lag_ms,
109 }
110 }
111
112 pub fn current_lsn(&self) -> u64 {
114 self.changelog.current_lsn()
115 }
116
117 pub fn stream_changes(&self, from_lsn: u64, batch_size: u32) -> StreamChangesResponse {
119 let (entries, has_more) = self
120 .changelog
121 .scan_batch(from_lsn, batch_size as usize)
122 .unwrap_or_else(|e| {
123 debug!(error = %e, "failed to scan changelog");
124 (vec![], false)
125 });
126
127 let next_lsn = entries.last().map(|e| e.lsn + 1).unwrap_or(from_lsn);
128
129 StreamChangesResponse::new(entries, next_lsn, has_more)
130 }
131
132 pub fn stream_changes_filtered(
134 &self,
135 from_lsn: u64,
136 batch_size: u32,
137 entity_filter: Option<&[String]>,
138 ) -> StreamChangesResponse {
139 let (entries, has_more) = self
140 .changelog
141 .scan_filtered(from_lsn, batch_size as usize, entity_filter)
142 .unwrap_or_else(|e| {
143 debug!(error = %e, "failed to scan changelog");
144 (vec![], false)
145 });
146
147 let next_lsn = entries.last().map(|e| e.lsn + 1).unwrap_or(from_lsn);
148
149 StreamChangesResponse::new(entries, next_lsn, has_more)
150 }
151
152 pub fn applied_lsn(&self) -> Option<u64> {
154 self.applier.as_ref().map(|a| a.applied_lsn())
155 }
156
157 pub fn changelog(&self) -> &ChangeLog {
159 &self.changelog
160 }
161
162 pub async fn promote_to_primary(&self) {
166 let mut role = self.role.write().await;
167 if role.is_replica() {
168 info!("Promoting server from replica to primary");
169 *role = ReplicationRole::Primary;
170 }
171 }
172
173 pub async fn demote_to_replica(&self, primary_addr: String) {
177 let mut role = self.role.write().await;
178 if !role.is_replica() {
179 info!(primary = %primary_addr, "Demoting server to replica");
180 *role = ReplicationRole::Replica { primary_addr };
181 }
182 }
183}
184
185pub type SharedReplicationManager = Arc<ReplicationManager>;
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use ormdb_core::storage::{StorageConfig, StorageEngine};
192
193 fn create_test_changelog() -> Arc<ChangeLog> {
194 let db = sled::Config::new().temporary(true).open().unwrap();
195 Arc::new(ChangeLog::open(&db).unwrap())
196 }
197
198 #[tokio::test]
199 async fn test_standalone_mode() {
200 let changelog = create_test_changelog();
201 let manager = ReplicationManager::new_standalone(changelog);
202
203 assert!(manager.can_write().await);
204 assert!(!manager.is_replica().await);
205 assert!(!manager.is_primary().await);
206
207 let status = manager.status().await;
208 assert!(matches!(status.role, ReplicationRole::Standalone));
209 }
210
211 #[tokio::test]
212 async fn test_primary_mode() {
213 let changelog = create_test_changelog();
214 let manager = ReplicationManager::new_primary(changelog);
215
216 assert!(manager.can_write().await);
217 assert!(manager.is_primary().await);
218 assert!(!manager.is_replica().await);
219
220 let status = manager.status().await;
221 assert!(matches!(status.role, ReplicationRole::Primary));
222 }
223
224 #[tokio::test]
225 async fn test_replica_mode() {
226 let changelog = create_test_changelog();
227
228 let dir = tempfile::tempdir().unwrap();
229 let storage = Arc::new(
230 StorageEngine::open(StorageConfig::new(dir.path())).unwrap()
231 );
232 let applier = Arc::new(ReplicaApplier::new(storage));
233
234 let manager = ReplicationManager::new_replica(
235 changelog,
236 applier,
237 "localhost:5432".to_string(),
238 );
239
240 assert!(!manager.can_write().await);
241 assert!(manager.is_replica().await);
242 assert!(!manager.is_primary().await);
243
244 let status = manager.status().await;
245 assert!(matches!(status.role, ReplicationRole::Replica { .. }));
246 }
247
248 #[tokio::test]
249 async fn test_promote_to_primary() {
250 let changelog = create_test_changelog();
251
252 let dir = tempfile::tempdir().unwrap();
253 let storage = Arc::new(
254 StorageEngine::open(StorageConfig::new(dir.path())).unwrap()
255 );
256 let applier = Arc::new(ReplicaApplier::new(storage));
257
258 let manager = ReplicationManager::new_replica(
259 changelog,
260 applier,
261 "localhost:5432".to_string(),
262 );
263
264 assert!(!manager.can_write().await);
265
266 manager.promote_to_primary().await;
267
268 assert!(manager.can_write().await);
269 assert!(manager.is_primary().await);
270 }
271
272 #[tokio::test]
273 async fn test_stream_changes_empty() {
274 let changelog = create_test_changelog();
275 let manager = ReplicationManager::new_standalone(changelog);
276
277 let response = manager.stream_changes(1, 10);
278
279 assert!(response.entries.is_empty());
280 assert_eq!(response.next_lsn, 1);
281 assert!(!response.has_more);
282 }
283}