ruvector_replication/
replica.rs

1//! Replica management and coordination
2//!
3//! Provides structures and logic for managing distributed replicas,
4//! including role management, health tracking, and promotion/demotion.
5
6use crate::{ReplicationError, Result};
7use chrono::{DateTime, Utc};
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use std::time::Duration;
13use uuid::Uuid;
14
15/// Role of a replica in the replication topology
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17pub enum ReplicaRole {
18    /// Primary replica that handles writes
19    Primary,
20    /// Secondary replica that replicates from primary
21    Secondary,
22    /// Witness replica for quorum without data replication
23    Witness,
24}
25
26/// Current status of a replica
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
28pub enum ReplicaStatus {
29    /// Replica is online and healthy
30    Healthy,
31    /// Replica is lagging behind
32    Lagging,
33    /// Replica is offline or unreachable
34    Offline,
35    /// Replica is recovering
36    Recovering,
37}
38
39/// Represents a single replica in the replication topology
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct Replica {
42    /// Unique identifier for the replica
43    pub id: String,
44    /// Network address of the replica
45    pub address: String,
46    /// Current role of the replica
47    pub role: ReplicaRole,
48    /// Current status of the replica
49    pub status: ReplicaStatus,
50    /// Replication lag in milliseconds
51    pub lag_ms: u64,
52    /// Last known position in the replication log
53    pub log_position: u64,
54    /// Last heartbeat timestamp
55    pub last_heartbeat: DateTime<Utc>,
56    /// Priority for failover (higher is better)
57    pub priority: u32,
58}
59
60impl Replica {
61    /// Create a new replica
62    pub fn new(id: impl Into<String>, address: impl Into<String>, role: ReplicaRole) -> Self {
63        Self {
64            id: id.into(),
65            address: address.into(),
66            role,
67            status: ReplicaStatus::Healthy,
68            lag_ms: 0,
69            log_position: 0,
70            last_heartbeat: Utc::now(),
71            priority: 100,
72        }
73    }
74
75    /// Check if the replica is healthy
76    pub fn is_healthy(&self) -> bool {
77        self.status == ReplicaStatus::Healthy && self.lag_ms < 5000
78    }
79
80    /// Check if the replica is available for reads
81    pub fn is_readable(&self) -> bool {
82        matches!(self.status, ReplicaStatus::Healthy | ReplicaStatus::Lagging)
83    }
84
85    /// Check if the replica is available for writes
86    pub fn is_writable(&self) -> bool {
87        self.role == ReplicaRole::Primary && self.status == ReplicaStatus::Healthy
88    }
89
90    /// Update the replica's lag
91    pub fn update_lag(&mut self, lag_ms: u64) {
92        self.lag_ms = lag_ms;
93        if lag_ms > 5000 {
94            self.status = ReplicaStatus::Lagging;
95        } else if self.status == ReplicaStatus::Lagging {
96            self.status = ReplicaStatus::Healthy;
97        }
98    }
99
100    /// Update the replica's log position
101    pub fn update_position(&mut self, position: u64) {
102        self.log_position = position;
103    }
104
105    /// Record a heartbeat
106    pub fn heartbeat(&mut self) {
107        self.last_heartbeat = Utc::now();
108        if self.status == ReplicaStatus::Offline {
109            self.status = ReplicaStatus::Recovering;
110        }
111    }
112
113    /// Check if the replica has timed out
114    pub fn is_timed_out(&self, timeout: Duration) -> bool {
115        let elapsed = Utc::now()
116            .signed_duration_since(self.last_heartbeat)
117            .to_std()
118            .unwrap_or(Duration::MAX);
119        elapsed > timeout
120    }
121}
122
123/// Manages a set of replicas
124pub struct ReplicaSet {
125    /// Cluster identifier
126    cluster_id: String,
127    /// Map of replica ID to replica
128    replicas: Arc<DashMap<String, Replica>>,
129    /// Current primary replica ID
130    primary_id: Arc<RwLock<Option<String>>>,
131    /// Minimum number of replicas for quorum
132    quorum_size: Arc<RwLock<usize>>,
133}
134
135impl ReplicaSet {
136    /// Create a new replica set
137    pub fn new(cluster_id: impl Into<String>) -> Self {
138        Self {
139            cluster_id: cluster_id.into(),
140            replicas: Arc::new(DashMap::new()),
141            primary_id: Arc::new(RwLock::new(None)),
142            quorum_size: Arc::new(RwLock::new(1)),
143        }
144    }
145
146    /// Add a replica to the set
147    pub fn add_replica(
148        &mut self,
149        id: impl Into<String>,
150        address: impl Into<String>,
151        role: ReplicaRole,
152    ) -> Result<()> {
153        let id = id.into();
154        let replica = Replica::new(id.clone(), address, role);
155
156        if role == ReplicaRole::Primary {
157            let mut primary = self.primary_id.write();
158            if primary.is_some() {
159                return Err(ReplicationError::InvalidState(
160                    "Primary replica already exists".to_string(),
161                ));
162            }
163            *primary = Some(id.clone());
164        }
165
166        self.replicas.insert(id, replica);
167        self.update_quorum_size();
168        Ok(())
169    }
170
171    /// Remove a replica from the set
172    pub fn remove_replica(&mut self, id: &str) -> Result<()> {
173        let replica = self
174            .replicas
175            .remove(id)
176            .ok_or_else(|| ReplicationError::ReplicaNotFound(id.to_string()))?;
177
178        if replica.1.role == ReplicaRole::Primary {
179            let mut primary = self.primary_id.write();
180            *primary = None;
181        }
182
183        self.update_quorum_size();
184        Ok(())
185    }
186
187    /// Get a replica by ID
188    pub fn get_replica(&self, id: &str) -> Option<Replica> {
189        self.replicas.get(id).map(|r| r.clone())
190    }
191
192    /// Get the current primary replica
193    pub fn get_primary(&self) -> Option<Replica> {
194        let primary_id = self.primary_id.read();
195        primary_id
196            .as_ref()
197            .and_then(|id| self.replicas.get(id).map(|r| r.clone()))
198    }
199
200    /// Get all secondary replicas
201    pub fn get_secondaries(&self) -> Vec<Replica> {
202        self.replicas
203            .iter()
204            .filter(|r| r.role == ReplicaRole::Secondary)
205            .map(|r| r.clone())
206            .collect()
207    }
208
209    /// Get all healthy replicas
210    pub fn get_healthy_replicas(&self) -> Vec<Replica> {
211        self.replicas
212            .iter()
213            .filter(|r| r.is_healthy())
214            .map(|r| r.clone())
215            .collect()
216    }
217
218    /// Promote a secondary to primary
219    pub fn promote_to_primary(&mut self, id: &str) -> Result<()> {
220        // Get the replica and verify it exists
221        let mut replica = self
222            .replicas
223            .get_mut(id)
224            .ok_or_else(|| ReplicationError::ReplicaNotFound(id.to_string()))?;
225
226        if replica.role == ReplicaRole::Primary {
227            return Ok(());
228        }
229
230        if replica.role == ReplicaRole::Witness {
231            return Err(ReplicationError::InvalidState(
232                "Cannot promote witness to primary".to_string(),
233            ));
234        }
235
236        // Demote current primary if exists
237        let old_primary_id = {
238            let mut primary = self.primary_id.write();
239            primary.take()
240        };
241
242        if let Some(old_id) = old_primary_id {
243            if let Some(mut old_primary) = self.replicas.get_mut(&old_id) {
244                old_primary.role = ReplicaRole::Secondary;
245            }
246        }
247
248        // Promote new primary
249        replica.role = ReplicaRole::Primary;
250        let mut primary = self.primary_id.write();
251        *primary = Some(id.to_string());
252
253        tracing::info!("Promoted replica {} to primary", id);
254        Ok(())
255    }
256
257    /// Demote a primary to secondary
258    pub fn demote_to_secondary(&mut self, id: &str) -> Result<()> {
259        let mut replica = self
260            .replicas
261            .get_mut(id)
262            .ok_or_else(|| ReplicationError::ReplicaNotFound(id.to_string()))?;
263
264        if replica.role != ReplicaRole::Primary {
265            return Ok(());
266        }
267
268        replica.role = ReplicaRole::Secondary;
269        let mut primary = self.primary_id.write();
270        *primary = None;
271
272        tracing::info!("Demoted replica {} to secondary", id);
273        Ok(())
274    }
275
276    /// Check if quorum is available
277    pub fn has_quorum(&self) -> bool {
278        let healthy_count = self
279            .replicas
280            .iter()
281            .filter(|r| r.is_healthy() && r.role != ReplicaRole::Witness)
282            .count();
283        let quorum = *self.quorum_size.read();
284        healthy_count >= quorum
285    }
286
287    /// Get the required quorum size
288    pub fn get_quorum_size(&self) -> usize {
289        *self.quorum_size.read()
290    }
291
292    /// Set the quorum size
293    pub fn set_quorum_size(&self, size: usize) {
294        *self.quorum_size.write() = size;
295    }
296
297    /// Update quorum size based on replica count
298    fn update_quorum_size(&self) {
299        let replica_count = self
300            .replicas
301            .iter()
302            .filter(|r| r.role != ReplicaRole::Witness)
303            .count();
304        let quorum = (replica_count / 2) + 1;
305        *self.quorum_size.write() = quorum;
306    }
307
308    /// Get all replica IDs
309    pub fn replica_ids(&self) -> Vec<String> {
310        self.replicas.iter().map(|r| r.id.clone()).collect()
311    }
312
313    /// Get replica count
314    pub fn replica_count(&self) -> usize {
315        self.replicas.len()
316    }
317
318    /// Get the cluster ID
319    pub fn cluster_id(&self) -> &str {
320        &self.cluster_id
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_replica_creation() {
330        let replica = Replica::new("r1", "127.0.0.1:9001", ReplicaRole::Primary);
331        assert_eq!(replica.id, "r1");
332        assert_eq!(replica.role, ReplicaRole::Primary);
333        assert!(replica.is_healthy());
334        assert!(replica.is_writable());
335    }
336
337    #[test]
338    fn test_replica_set() {
339        let mut set = ReplicaSet::new("cluster-1");
340        set.add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
341            .unwrap();
342        set.add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
343            .unwrap();
344
345        assert_eq!(set.replica_count(), 2);
346        assert!(set.get_primary().is_some());
347        assert_eq!(set.get_secondaries().len(), 1);
348    }
349
350    #[test]
351    fn test_promotion() {
352        let mut set = ReplicaSet::new("cluster-1");
353        set.add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
354            .unwrap();
355        set.add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
356            .unwrap();
357
358        set.promote_to_primary("r2").unwrap();
359
360        let primary = set.get_primary().unwrap();
361        assert_eq!(primary.id, "r2");
362        assert_eq!(primary.role, ReplicaRole::Primary);
363    }
364
365    #[test]
366    fn test_quorum() {
367        let mut set = ReplicaSet::new("cluster-1");
368        set.add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
369            .unwrap();
370        set.add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
371            .unwrap();
372        set.add_replica("r3", "127.0.0.1:9003", ReplicaRole::Secondary)
373            .unwrap();
374
375        assert_eq!(set.get_quorum_size(), 2);
376        assert!(set.has_quorum());
377    }
378}