1use crate::{ReplicationError, Result};
7use chrono::{DateTime, Utc};
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use std::time::Duration;
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17pub enum ReplicaRole {
18 Primary,
20 Secondary,
22 Witness,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
28pub enum ReplicaStatus {
29 Healthy,
31 Lagging,
33 Offline,
35 Recovering,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct Replica {
42 pub id: String,
44 pub address: String,
46 pub role: ReplicaRole,
48 pub status: ReplicaStatus,
50 pub lag_ms: u64,
52 pub log_position: u64,
54 pub last_heartbeat: DateTime<Utc>,
56 pub priority: u32,
58}
59
60impl Replica {
61 pub fn new(id: impl Into<String>, address: impl Into<String>, role: ReplicaRole) -> Self {
63 Self {
64 id: id.into(),
65 address: address.into(),
66 role,
67 status: ReplicaStatus::Healthy,
68 lag_ms: 0,
69 log_position: 0,
70 last_heartbeat: Utc::now(),
71 priority: 100,
72 }
73 }
74
75 pub fn is_healthy(&self) -> bool {
77 self.status == ReplicaStatus::Healthy && self.lag_ms < 5000
78 }
79
80 pub fn is_readable(&self) -> bool {
82 matches!(self.status, ReplicaStatus::Healthy | ReplicaStatus::Lagging)
83 }
84
85 pub fn is_writable(&self) -> bool {
87 self.role == ReplicaRole::Primary && self.status == ReplicaStatus::Healthy
88 }
89
90 pub fn update_lag(&mut self, lag_ms: u64) {
92 self.lag_ms = lag_ms;
93 if lag_ms > 5000 {
94 self.status = ReplicaStatus::Lagging;
95 } else if self.status == ReplicaStatus::Lagging {
96 self.status = ReplicaStatus::Healthy;
97 }
98 }
99
100 pub fn update_position(&mut self, position: u64) {
102 self.log_position = position;
103 }
104
105 pub fn heartbeat(&mut self) {
107 self.last_heartbeat = Utc::now();
108 if self.status == ReplicaStatus::Offline {
109 self.status = ReplicaStatus::Recovering;
110 }
111 }
112
113 pub fn is_timed_out(&self, timeout: Duration) -> bool {
115 let elapsed = Utc::now()
116 .signed_duration_since(self.last_heartbeat)
117 .to_std()
118 .unwrap_or(Duration::MAX);
119 elapsed > timeout
120 }
121}
122
123pub struct ReplicaSet {
125 cluster_id: String,
127 replicas: Arc<DashMap<String, Replica>>,
129 primary_id: Arc<RwLock<Option<String>>>,
131 quorum_size: Arc<RwLock<usize>>,
133}
134
135impl ReplicaSet {
136 pub fn new(cluster_id: impl Into<String>) -> Self {
138 Self {
139 cluster_id: cluster_id.into(),
140 replicas: Arc::new(DashMap::new()),
141 primary_id: Arc::new(RwLock::new(None)),
142 quorum_size: Arc::new(RwLock::new(1)),
143 }
144 }
145
146 pub fn add_replica(
148 &mut self,
149 id: impl Into<String>,
150 address: impl Into<String>,
151 role: ReplicaRole,
152 ) -> Result<()> {
153 let id = id.into();
154 let replica = Replica::new(id.clone(), address, role);
155
156 if role == ReplicaRole::Primary {
157 let mut primary = self.primary_id.write();
158 if primary.is_some() {
159 return Err(ReplicationError::InvalidState(
160 "Primary replica already exists".to_string(),
161 ));
162 }
163 *primary = Some(id.clone());
164 }
165
166 self.replicas.insert(id, replica);
167 self.update_quorum_size();
168 Ok(())
169 }
170
171 pub fn remove_replica(&mut self, id: &str) -> Result<()> {
173 let replica = self
174 .replicas
175 .remove(id)
176 .ok_or_else(|| ReplicationError::ReplicaNotFound(id.to_string()))?;
177
178 if replica.1.role == ReplicaRole::Primary {
179 let mut primary = self.primary_id.write();
180 *primary = None;
181 }
182
183 self.update_quorum_size();
184 Ok(())
185 }
186
187 pub fn get_replica(&self, id: &str) -> Option<Replica> {
189 self.replicas.get(id).map(|r| r.clone())
190 }
191
192 pub fn get_primary(&self) -> Option<Replica> {
194 let primary_id = self.primary_id.read();
195 primary_id
196 .as_ref()
197 .and_then(|id| self.replicas.get(id).map(|r| r.clone()))
198 }
199
200 pub fn get_secondaries(&self) -> Vec<Replica> {
202 self.replicas
203 .iter()
204 .filter(|r| r.role == ReplicaRole::Secondary)
205 .map(|r| r.clone())
206 .collect()
207 }
208
209 pub fn get_healthy_replicas(&self) -> Vec<Replica> {
211 self.replicas
212 .iter()
213 .filter(|r| r.is_healthy())
214 .map(|r| r.clone())
215 .collect()
216 }
217
218 pub fn promote_to_primary(&mut self, id: &str) -> Result<()> {
220 let mut replica = self
222 .replicas
223 .get_mut(id)
224 .ok_or_else(|| ReplicationError::ReplicaNotFound(id.to_string()))?;
225
226 if replica.role == ReplicaRole::Primary {
227 return Ok(());
228 }
229
230 if replica.role == ReplicaRole::Witness {
231 return Err(ReplicationError::InvalidState(
232 "Cannot promote witness to primary".to_string(),
233 ));
234 }
235
236 let old_primary_id = {
238 let mut primary = self.primary_id.write();
239 primary.take()
240 };
241
242 if let Some(old_id) = old_primary_id {
243 if let Some(mut old_primary) = self.replicas.get_mut(&old_id) {
244 old_primary.role = ReplicaRole::Secondary;
245 }
246 }
247
248 replica.role = ReplicaRole::Primary;
250 let mut primary = self.primary_id.write();
251 *primary = Some(id.to_string());
252
253 tracing::info!("Promoted replica {} to primary", id);
254 Ok(())
255 }
256
257 pub fn demote_to_secondary(&mut self, id: &str) -> Result<()> {
259 let mut replica = self
260 .replicas
261 .get_mut(id)
262 .ok_or_else(|| ReplicationError::ReplicaNotFound(id.to_string()))?;
263
264 if replica.role != ReplicaRole::Primary {
265 return Ok(());
266 }
267
268 replica.role = ReplicaRole::Secondary;
269 let mut primary = self.primary_id.write();
270 *primary = None;
271
272 tracing::info!("Demoted replica {} to secondary", id);
273 Ok(())
274 }
275
276 pub fn has_quorum(&self) -> bool {
278 let healthy_count = self
279 .replicas
280 .iter()
281 .filter(|r| r.is_healthy() && r.role != ReplicaRole::Witness)
282 .count();
283 let quorum = *self.quorum_size.read();
284 healthy_count >= quorum
285 }
286
287 pub fn get_quorum_size(&self) -> usize {
289 *self.quorum_size.read()
290 }
291
292 pub fn set_quorum_size(&self, size: usize) {
294 *self.quorum_size.write() = size;
295 }
296
297 fn update_quorum_size(&self) {
299 let replica_count = self
300 .replicas
301 .iter()
302 .filter(|r| r.role != ReplicaRole::Witness)
303 .count();
304 let quorum = (replica_count / 2) + 1;
305 *self.quorum_size.write() = quorum;
306 }
307
308 pub fn replica_ids(&self) -> Vec<String> {
310 self.replicas.iter().map(|r| r.id.clone()).collect()
311 }
312
313 pub fn replica_count(&self) -> usize {
315 self.replicas.len()
316 }
317
318 pub fn cluster_id(&self) -> &str {
320 &self.cluster_id
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_replica_creation() {
330 let replica = Replica::new("r1", "127.0.0.1:9001", ReplicaRole::Primary);
331 assert_eq!(replica.id, "r1");
332 assert_eq!(replica.role, ReplicaRole::Primary);
333 assert!(replica.is_healthy());
334 assert!(replica.is_writable());
335 }
336
337 #[test]
338 fn test_replica_set() {
339 let mut set = ReplicaSet::new("cluster-1");
340 set.add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
341 .unwrap();
342 set.add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
343 .unwrap();
344
345 assert_eq!(set.replica_count(), 2);
346 assert!(set.get_primary().is_some());
347 assert_eq!(set.get_secondaries().len(), 1);
348 }
349
350 #[test]
351 fn test_promotion() {
352 let mut set = ReplicaSet::new("cluster-1");
353 set.add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
354 .unwrap();
355 set.add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
356 .unwrap();
357
358 set.promote_to_primary("r2").unwrap();
359
360 let primary = set.get_primary().unwrap();
361 assert_eq!(primary.id, "r2");
362 assert_eq!(primary.role, ReplicaRole::Primary);
363 }
364
365 #[test]
366 fn test_quorum() {
367 let mut set = ReplicaSet::new("cluster-1");
368 set.add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
369 .unwrap();
370 set.add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
371 .unwrap();
372 set.add_replica("r3", "127.0.0.1:9003", ReplicaRole::Secondary)
373 .unwrap();
374
375 assert_eq!(set.get_quorum_size(), 2);
376 assert!(set.has_quorum());
377 }
378}