Skip to main content

ormdb_server/replication/
manager.rs

1//! Replication manager for coordinating replication state.
2
3use std::sync::Arc;
4
5use tokio::sync::RwLock;
6use tracing::{debug, info};
7
8use ormdb_core::replication::{ChangeLog, ReplicaApplier};
9use ormdb_proto::replication::{ReplicationRole, ReplicationStatus, StreamChangesResponse};
10
11/// Replication manager that coordinates replication state and operations.
12///
13/// This manages:
14/// - Server replication role (Primary, Replica, Standalone)
15/// - Write access control (replicas are read-only)
16/// - Change streaming for replica consumers
17/// - Replication status reporting
18pub struct ReplicationManager {
19    /// Current replication role.
20    role: RwLock<ReplicationRole>,
21    /// Changelog for streaming changes.
22    changelog: Arc<ChangeLog>,
23    /// Applier for applying changes (replica mode only).
24    applier: Option<Arc<ReplicaApplier>>,
25}
26
27impl ReplicationManager {
28    /// Create a new replication manager in primary mode.
29    pub fn new_primary(changelog: Arc<ChangeLog>) -> Self {
30        info!("Initializing replication manager in primary mode");
31        Self {
32            role: RwLock::new(ReplicationRole::Primary),
33            changelog,
34            applier: None,
35        }
36    }
37
38    /// Create a new replication manager in standalone mode.
39    pub fn new_standalone(changelog: Arc<ChangeLog>) -> Self {
40        info!("Initializing replication manager in standalone mode");
41        Self {
42            role: RwLock::new(ReplicationRole::Standalone),
43            changelog,
44            applier: None,
45        }
46    }
47
48    /// Create a new replication manager in replica mode.
49    pub fn new_replica(
50        changelog: Arc<ChangeLog>,
51        applier: Arc<ReplicaApplier>,
52        primary_addr: String,
53    ) -> Self {
54        info!(primary = %primary_addr, "Initializing replication manager in replica mode");
55        Self {
56            role: RwLock::new(ReplicationRole::Replica { primary_addr }),
57            changelog,
58            applier: Some(applier),
59        }
60    }
61
62    /// Check if writes are allowed (i.e., not a replica).
63    pub async fn can_write(&self) -> bool {
64        let role = self.role.read().await;
65        role.can_write()
66    }
67
68    /// Get the current replication role.
69    pub async fn role(&self) -> ReplicationRole {
70        self.role.read().await.clone()
71    }
72
73    /// Check if this server is a replica.
74    pub async fn is_replica(&self) -> bool {
75        let role = self.role.read().await;
76        role.is_replica()
77    }
78
79    /// Check if this server is the primary.
80    pub async fn is_primary(&self) -> bool {
81        let role = self.role.read().await;
82        role.is_primary()
83    }
84
85    /// Get the current replication status.
86    pub async fn status(&self) -> ReplicationStatus {
87        let role = self.role.read().await.clone();
88        let current_lsn = self.changelog.current_lsn();
89
90        let (lag_entries, lag_ms) = match &role {
91            ReplicationRole::Replica { .. } => {
92                if let Some(ref applier) = self.applier {
93                    let applied = applier.applied_lsn();
94                    let lag = current_lsn.saturating_sub(applied);
95                    // TODO: compute lag_ms based on timestamps
96                    (lag, 0)
97                } else {
98                    (0, 0)
99                }
100            }
101            _ => (0, 0),
102        };
103
104        ReplicationStatus {
105            role,
106            current_lsn,
107            lag_entries,
108            lag_ms,
109        }
110    }
111
112    /// Get the current LSN.
113    pub fn current_lsn(&self) -> u64 {
114        self.changelog.current_lsn()
115    }
116
117    /// Stream changes from the given LSN.
118    pub fn stream_changes(&self, from_lsn: u64, batch_size: u32) -> StreamChangesResponse {
119        let (entries, has_more) = self
120            .changelog
121            .scan_batch(from_lsn, batch_size as usize)
122            .unwrap_or_else(|e| {
123                debug!(error = %e, "failed to scan changelog");
124                (vec![], false)
125            });
126
127        let next_lsn = entries.last().map(|e| e.lsn + 1).unwrap_or(from_lsn);
128
129        StreamChangesResponse::new(entries, next_lsn, has_more)
130    }
131
132    /// Stream changes with entity filter.
133    pub fn stream_changes_filtered(
134        &self,
135        from_lsn: u64,
136        batch_size: u32,
137        entity_filter: Option<&[String]>,
138    ) -> StreamChangesResponse {
139        let (entries, has_more) = self
140            .changelog
141            .scan_filtered(from_lsn, batch_size as usize, entity_filter)
142            .unwrap_or_else(|e| {
143                debug!(error = %e, "failed to scan changelog");
144                (vec![], false)
145            });
146
147        let next_lsn = entries.last().map(|e| e.lsn + 1).unwrap_or(from_lsn);
148
149        StreamChangesResponse::new(entries, next_lsn, has_more)
150    }
151
152    /// Get the applied LSN (for replicas).
153    pub fn applied_lsn(&self) -> Option<u64> {
154        self.applier.as_ref().map(|a| a.applied_lsn())
155    }
156
157    /// Get a reference to the changelog.
158    pub fn changelog(&self) -> &ChangeLog {
159        &self.changelog
160    }
161
162    /// Promote this server to primary.
163    ///
164    /// This is used when the primary fails and a replica needs to take over.
165    pub async fn promote_to_primary(&self) {
166        let mut role = self.role.write().await;
167        if role.is_replica() {
168            info!("Promoting server from replica to primary");
169            *role = ReplicationRole::Primary;
170        }
171    }
172
173    /// Demote this server to replica.
174    ///
175    /// This is used when another server becomes primary.
176    pub async fn demote_to_replica(&self, primary_addr: String) {
177        let mut role = self.role.write().await;
178        if !role.is_replica() {
179            info!(primary = %primary_addr, "Demoting server to replica");
180            *role = ReplicationRole::Replica { primary_addr };
181        }
182    }
183}
184
185/// Shared replication manager handle.
186pub type SharedReplicationManager = Arc<ReplicationManager>;
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use ormdb_core::storage::{StorageConfig, StorageEngine};
192
193    fn create_test_changelog() -> Arc<ChangeLog> {
194        let db = sled::Config::new().temporary(true).open().unwrap();
195        Arc::new(ChangeLog::open(&db).unwrap())
196    }
197
198    #[tokio::test]
199    async fn test_standalone_mode() {
200        let changelog = create_test_changelog();
201        let manager = ReplicationManager::new_standalone(changelog);
202
203        assert!(manager.can_write().await);
204        assert!(!manager.is_replica().await);
205        assert!(!manager.is_primary().await);
206
207        let status = manager.status().await;
208        assert!(matches!(status.role, ReplicationRole::Standalone));
209    }
210
211    #[tokio::test]
212    async fn test_primary_mode() {
213        let changelog = create_test_changelog();
214        let manager = ReplicationManager::new_primary(changelog);
215
216        assert!(manager.can_write().await);
217        assert!(manager.is_primary().await);
218        assert!(!manager.is_replica().await);
219
220        let status = manager.status().await;
221        assert!(matches!(status.role, ReplicationRole::Primary));
222    }
223
224    #[tokio::test]
225    async fn test_replica_mode() {
226        let changelog = create_test_changelog();
227
228        let dir = tempfile::tempdir().unwrap();
229        let storage = Arc::new(
230            StorageEngine::open(StorageConfig::new(dir.path())).unwrap()
231        );
232        let applier = Arc::new(ReplicaApplier::new(storage));
233
234        let manager = ReplicationManager::new_replica(
235            changelog,
236            applier,
237            "localhost:5432".to_string(),
238        );
239
240        assert!(!manager.can_write().await);
241        assert!(manager.is_replica().await);
242        assert!(!manager.is_primary().await);
243
244        let status = manager.status().await;
245        assert!(matches!(status.role, ReplicationRole::Replica { .. }));
246    }
247
248    #[tokio::test]
249    async fn test_promote_to_primary() {
250        let changelog = create_test_changelog();
251
252        let dir = tempfile::tempdir().unwrap();
253        let storage = Arc::new(
254            StorageEngine::open(StorageConfig::new(dir.path())).unwrap()
255        );
256        let applier = Arc::new(ReplicaApplier::new(storage));
257
258        let manager = ReplicationManager::new_replica(
259            changelog,
260            applier,
261            "localhost:5432".to_string(),
262        );
263
264        assert!(!manager.can_write().await);
265
266        manager.promote_to_primary().await;
267
268        assert!(manager.can_write().await);
269        assert!(manager.is_primary().await);
270    }
271
272    #[tokio::test]
273    async fn test_stream_changes_empty() {
274        let changelog = create_test_changelog();
275        let manager = ReplicationManager::new_standalone(changelog);
276
277        let response = manager.stream_changes(1, 10);
278
279        assert!(response.entries.is_empty());
280        assert_eq!(response.next_lsn, 1);
281        assert!(!response.has_more);
282    }
283}