Skip to main content

oxirs_cluster/
replication.rs

1//! # Data Replication
2//!
3//! High-level data replication management for distributed RDF storage.
4//! Works with Raft consensus to ensure consistent replication.
5
6use crate::raft::OxirsNodeId;
7use std::collections::HashMap;
8use std::time::{Duration, SystemTime};
9use tokio::time::sleep;
10
11/// Replication strategy for the cluster
12#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
13pub enum ReplicationStrategy {
14    /// Synchronous replication - wait for all replicas
15    Synchronous,
16    /// Asynchronous replication - fire and forget
17    Asynchronous,
18    /// Semi-synchronous - wait for minimum replicas
19    SemiSynchronous { min_replicas: usize },
20    /// Raft consensus - use Raft for replication
21    RaftConsensus,
22}
23
24impl Default for ReplicationStrategy {
25    fn default() -> Self {
26        Self::RaftConsensus
27    }
28}
29
30/// Information about a replica node
31#[derive(Debug, Clone)]
32pub struct ReplicaInfo {
33    /// Unique node identifier
34    pub node_id: OxirsNodeId,
35    /// Address of the replica
36    pub address: String,
37    /// Last successfully applied log index
38    pub last_applied_index: u64,
39    /// Whether the replica is currently healthy
40    pub is_healthy: bool,
41    /// Last time we successfully communicated with this replica
42    pub last_contact: SystemTime,
43    /// Current replication lag in log entries
44    pub replication_lag: u64,
45    /// Network latency to this replica
46    pub latency: Duration,
47}
48
49impl ReplicaInfo {
50    /// Create a new replica info
51    pub fn new(node_id: OxirsNodeId, address: String) -> Self {
52        Self {
53            node_id,
54            address,
55            last_applied_index: 0,
56            is_healthy: true,
57            last_contact: SystemTime::now(),
58            replication_lag: 0,
59            latency: Duration::from_millis(0),
60        }
61    }
62
63    /// Check if replica is stale based on contact time
64    pub fn is_stale(&self, threshold: Duration) -> bool {
65        self.last_contact.elapsed().unwrap_or(Duration::MAX) > threshold
66    }
67
68    /// Update health status and contact time
69    pub fn update_health(&mut self, is_healthy: bool) {
70        self.is_healthy = is_healthy;
71        if is_healthy {
72            self.last_contact = SystemTime::now();
73        }
74    }
75}
76
77/// Replication statistics
78#[derive(Debug, Clone, Default)]
79pub struct ReplicationStats {
80    pub total_replicas: usize,
81    pub healthy_replicas: usize,
82    pub average_lag: f64,
83    pub max_lag: u64,
84    pub min_lag: u64,
85    pub average_latency: Duration,
86    pub replication_throughput: f64, // operations per second
87}
88
89/// Replication manager for distributed RDF data
90#[derive(Debug)]
91pub struct ReplicationManager {
92    strategy: ReplicationStrategy,
93    replicas: HashMap<OxirsNodeId, ReplicaInfo>,
94    local_node_id: OxirsNodeId,
95    stats: ReplicationStats,
96}
97
98impl ReplicationManager {
99    /// Create a new replication manager
100    pub fn new(strategy: ReplicationStrategy, local_node_id: OxirsNodeId) -> Self {
101        Self {
102            strategy,
103            replicas: HashMap::new(),
104            local_node_id,
105            stats: ReplicationStats::default(),
106        }
107    }
108
109    /// Create a new replication manager with Raft consensus (default)
110    pub fn with_raft_consensus(local_node_id: OxirsNodeId) -> Self {
111        Self::new(ReplicationStrategy::RaftConsensus, local_node_id)
112    }
113
114    /// Add a replica to the replication set
115    pub fn add_replica(&mut self, node_id: OxirsNodeId, address: String) -> bool {
116        if node_id == self.local_node_id {
117            tracing::warn!("Cannot add local node as replica");
118            return false;
119        }
120
121        let replica_info = ReplicaInfo::new(node_id, address.clone());
122        let is_new = !self.replicas.contains_key(&node_id);
123
124        self.replicas.insert(node_id, replica_info);
125
126        if is_new {
127            tracing::info!("Added replica {} at {}", node_id, address);
128            self.update_stats();
129        }
130
131        is_new
132    }
133
134    /// Remove a replica from the replication set
135    pub fn remove_replica(&mut self, node_id: OxirsNodeId) -> bool {
136        if let Some(replica) = self.replicas.remove(&node_id) {
137            tracing::info!("Removed replica {} at {}", node_id, replica.address);
138            self.update_stats();
139            true
140        } else {
141            false
142        }
143    }
144
145    /// Get all replicas
146    pub fn get_replicas(&self) -> &HashMap<OxirsNodeId, ReplicaInfo> {
147        &self.replicas
148    }
149
150    /// Get healthy replicas only
151    pub fn get_healthy_replicas(&self) -> Vec<&ReplicaInfo> {
152        self.replicas
153            .values()
154            .filter(|replica| replica.is_healthy)
155            .collect()
156    }
157
158    /// Get replica by node ID
159    pub fn get_replica(&self, node_id: OxirsNodeId) -> Option<&ReplicaInfo> {
160        self.replicas.get(&node_id)
161    }
162
163    /// Update replica health status
164    pub fn update_replica_health(&mut self, node_id: OxirsNodeId, is_healthy: bool) -> bool {
165        if let Some(replica) = self.replicas.get_mut(&node_id) {
166            let was_healthy = replica.is_healthy;
167            replica.update_health(is_healthy);
168
169            if was_healthy != is_healthy {
170                tracing::info!(
171                    "Replica {} health changed: {} -> {}",
172                    node_id,
173                    was_healthy,
174                    is_healthy
175                );
176                self.update_stats();
177            }
178
179            true
180        } else {
181            false
182        }
183    }
184
185    /// Update replica lag information
186    pub fn update_replica_lag(
187        &mut self,
188        node_id: OxirsNodeId,
189        applied_index: u64,
190        current_index: u64,
191    ) {
192        if let Some(replica) = self.replicas.get_mut(&node_id) {
193            replica.last_applied_index = applied_index;
194            replica.replication_lag = current_index.saturating_sub(applied_index);
195            self.update_stats();
196        }
197    }
198
199    /// Check all replicas and mark stale ones as unhealthy
200    pub async fn health_check(&mut self, stale_threshold: Duration) {
201        let mut changed = false;
202
203        for replica in self.replicas.values_mut() {
204            let was_healthy = replica.is_healthy;
205
206            if replica.is_stale(stale_threshold) {
207                replica.is_healthy = false;
208            }
209
210            if was_healthy != replica.is_healthy {
211                changed = true;
212                tracing::warn!(
213                    "Replica {} marked as unhealthy due to staleness",
214                    replica.node_id
215                );
216            }
217        }
218
219        if changed {
220            self.update_stats();
221        }
222    }
223
224    /// Get the current replication strategy
225    pub fn get_strategy(&self) -> &ReplicationStrategy {
226        &self.strategy
227    }
228
229    /// Change the replication strategy
230    pub fn set_strategy(&mut self, strategy: ReplicationStrategy) {
231        if self.strategy != strategy {
232            tracing::info!(
233                "Changing replication strategy from {:?} to {:?}",
234                self.strategy,
235                strategy
236            );
237            self.strategy = strategy;
238        }
239    }
240
241    /// Get replication statistics
242    pub fn get_stats(&self) -> &ReplicationStats {
243        &self.stats
244    }
245
246    /// Check if replication requirements are met
247    pub fn is_replication_healthy(&self) -> bool {
248        let healthy_count = self.get_healthy_replicas().len();
249
250        match &self.strategy {
251            ReplicationStrategy::Synchronous => healthy_count == self.replicas.len(),
252            ReplicationStrategy::Asynchronous => true, // Always considered healthy
253            ReplicationStrategy::SemiSynchronous { min_replicas } => healthy_count >= *min_replicas,
254            ReplicationStrategy::RaftConsensus => {
255                // For Raft, we need a majority of nodes to be healthy
256                let total_nodes = self.replicas.len() + 1; // +1 for local node
257                let majority = (total_nodes / 2) + 1;
258                healthy_count + 1 >= majority // +1 for local node
259            }
260        }
261    }
262
263    /// Get required replica count for the current strategy
264    pub fn required_replica_count(&self) -> usize {
265        match &self.strategy {
266            ReplicationStrategy::Synchronous => self.replicas.len(),
267            ReplicationStrategy::Asynchronous => 0,
268            ReplicationStrategy::SemiSynchronous { min_replicas } => *min_replicas,
269            ReplicationStrategy::RaftConsensus => {
270                let total_nodes = self.replicas.len() + 1;
271                (total_nodes / 2) + 1 - 1 // -1 because local node is not a replica
272            }
273        }
274    }
275
276    /// Update internal statistics
277    fn update_stats(&mut self) {
278        let healthy_replicas_count = self.replicas.values().filter(|r| r.is_healthy).count();
279        let healthy_lags: Vec<u64> = self
280            .replicas
281            .values()
282            .filter(|r| r.is_healthy)
283            .map(|r| r.replication_lag)
284            .collect();
285        let healthy_latencies: Vec<Duration> = self
286            .replicas
287            .values()
288            .filter(|r| r.is_healthy)
289            .map(|r| r.latency)
290            .collect();
291
292        self.stats.total_replicas = self.replicas.len();
293        self.stats.healthy_replicas = healthy_replicas_count;
294
295        if !healthy_lags.is_empty() {
296            let total_lag: u64 = healthy_lags.iter().sum();
297            self.stats.average_lag = total_lag as f64 / healthy_lags.len() as f64;
298            self.stats.max_lag = healthy_lags.iter().copied().max().unwrap_or(0);
299            self.stats.min_lag = healthy_lags.iter().copied().min().unwrap_or(0);
300
301            let total_latency: Duration = healthy_latencies.iter().sum();
302            self.stats.average_latency = total_latency / healthy_latencies.len() as u32;
303        } else {
304            self.stats.average_lag = 0.0;
305            self.stats.max_lag = 0;
306            self.stats.min_lag = 0;
307            self.stats.average_latency = Duration::from_millis(0);
308        }
309    }
310
311    /// Run periodic maintenance tasks
312    pub async fn run_maintenance(&mut self) {
313        const HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(30);
314        const STALE_THRESHOLD: Duration = Duration::from_secs(60);
315
316        loop {
317            sleep(HEALTH_CHECK_INTERVAL).await;
318
319            self.health_check(STALE_THRESHOLD).await;
320
321            // Log stats periodically
322            if self.stats.total_replicas > 0 {
323                tracing::debug!(
324                    "Replication stats: {}/{} healthy, avg lag: {:.1}, max lag: {}",
325                    self.stats.healthy_replicas,
326                    self.stats.total_replicas,
327                    self.stats.average_lag,
328                    self.stats.max_lag
329                );
330            }
331        }
332    }
333}
334
335/// Replication-related errors
336#[derive(Debug, thiserror::Error)]
337pub enum ReplicationError {
338    #[error("Insufficient replicas: need {required}, have {available}")]
339    InsufficientReplicas { required: usize, available: usize },
340
341    #[error("Replica {node_id} is unhealthy")]
342    UnhealthyReplica { node_id: OxirsNodeId },
343
344    #[error("Replication timeout after {timeout:?}")]
345    Timeout { timeout: Duration },
346
347    #[error("Network error: {message}")]
348    Network { message: String },
349
350    #[error("Serialization error: {message}")]
351    Serialization { message: String },
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn test_replication_strategy_default() {
360        let strategy = ReplicationStrategy::default();
361        assert_eq!(strategy, ReplicationStrategy::RaftConsensus);
362    }
363
364    #[test]
365    fn test_replica_info_creation() {
366        let replica = ReplicaInfo::new(1, "127.0.0.1:8080".to_string());
367
368        assert_eq!(replica.node_id, 1);
369        assert_eq!(replica.address, "127.0.0.1:8080");
370        assert_eq!(replica.last_applied_index, 0);
371        assert!(replica.is_healthy);
372        assert_eq!(replica.replication_lag, 0);
373        assert_eq!(replica.latency, Duration::from_millis(0));
374    }
375
376    #[test]
377    fn test_replica_info_staleness() {
378        let replica = ReplicaInfo::new(1, "127.0.0.1:8080".to_string());
379
380        // Fresh replica should not be stale
381        assert!(!replica.is_stale(Duration::from_secs(10)));
382
383        // Wait a tiny bit to ensure elapsed time passes the threshold
384        std::thread::sleep(Duration::from_micros(1));
385
386        // Simulate old replica by checking against very short threshold
387        assert!(replica.is_stale(Duration::from_nanos(1)));
388    }
389
390    #[test]
391    fn test_replica_info_update_health() {
392        let mut replica = ReplicaInfo::new(1, "127.0.0.1:8080".to_string());
393
394        // Update to unhealthy
395        replica.update_health(false);
396        assert!(!replica.is_healthy);
397
398        // Update to healthy
399        replica.update_health(true);
400        assert!(replica.is_healthy);
401    }
402
403    #[test]
404    fn test_replication_manager_creation() {
405        let manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
406
407        assert_eq!(manager.strategy, ReplicationStrategy::Synchronous);
408        assert_eq!(manager.local_node_id, 1);
409        assert!(manager.replicas.is_empty());
410        assert_eq!(manager.stats.total_replicas, 0);
411    }
412
413    #[test]
414    fn test_replication_manager_with_raft_consensus() {
415        let manager = ReplicationManager::with_raft_consensus(1);
416
417        assert_eq!(manager.strategy, ReplicationStrategy::RaftConsensus);
418        assert_eq!(manager.local_node_id, 1);
419    }
420
421    #[test]
422    fn test_replication_manager_add_replica() {
423        let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
424
425        // Add replica
426        assert!(manager.add_replica(2, "127.0.0.1:8081".to_string()));
427        assert_eq!(manager.replicas.len(), 1);
428        assert!(manager.replicas.contains_key(&2));
429
430        // Adding same replica again should return false
431        assert!(!manager.add_replica(2, "127.0.0.1:8081".to_string()));
432        assert_eq!(manager.replicas.len(), 1);
433
434        // Cannot add local node as replica
435        assert!(!manager.add_replica(1, "127.0.0.1:8080".to_string()));
436        assert_eq!(manager.replicas.len(), 1);
437    }
438
439    #[test]
440    fn test_replication_manager_remove_replica() {
441        let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
442
443        manager.add_replica(2, "127.0.0.1:8081".to_string());
444        manager.add_replica(3, "127.0.0.1:8082".to_string());
445        assert_eq!(manager.replicas.len(), 2);
446
447        // Remove replica
448        assert!(manager.remove_replica(2));
449        assert_eq!(manager.replicas.len(), 1);
450        assert!(!manager.replicas.contains_key(&2));
451
452        // Removing non-existent replica should return false
453        assert!(!manager.remove_replica(4));
454        assert_eq!(manager.replicas.len(), 1);
455    }
456
457    #[test]
458    fn test_replication_manager_get_healthy_replicas() {
459        let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
460
461        manager.add_replica(2, "127.0.0.1:8081".to_string());
462        manager.add_replica(3, "127.0.0.1:8082".to_string());
463
464        // Mark one replica as unhealthy
465        manager.update_replica_health(3, false);
466
467        let healthy_replicas = manager.get_healthy_replicas();
468        assert_eq!(healthy_replicas.len(), 1);
469        assert_eq!(healthy_replicas[0].node_id, 2);
470    }
471
472    #[test]
473    fn test_replication_manager_update_replica_health() {
474        let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
475
476        manager.add_replica(2, "127.0.0.1:8081".to_string());
477
478        // Update health status
479        assert!(manager.update_replica_health(2, false));
480        let replica = manager.get_replica(2).unwrap();
481        assert!(!replica.is_healthy);
482
483        // Update non-existent replica should return false
484        assert!(!manager.update_replica_health(3, true));
485    }
486
487    #[test]
488    fn test_replication_manager_update_replica_lag() {
489        let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
490
491        manager.add_replica(2, "127.0.0.1:8081".to_string());
492
493        // Update lag information
494        manager.update_replica_lag(2, 50, 100);
495        let replica = manager.get_replica(2).unwrap();
496        assert_eq!(replica.last_applied_index, 50);
497        assert_eq!(replica.replication_lag, 50);
498    }
499
500    #[tokio::test]
501    async fn test_replication_manager_health_check() {
502        let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
503
504        manager.add_replica(2, "127.0.0.1:8081".to_string());
505        manager.add_replica(3, "127.0.0.1:8082".to_string());
506
507        // Both replicas should be healthy initially
508        assert_eq!(manager.get_healthy_replicas().len(), 2);
509
510        // Run health check with very short threshold - should mark all as unhealthy
511        manager.health_check(Duration::from_nanos(1)).await;
512        assert_eq!(manager.get_healthy_replicas().len(), 0);
513    }
514
515    #[test]
516    fn test_replication_manager_strategy_change() {
517        let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
518
519        assert_eq!(manager.get_strategy(), &ReplicationStrategy::Synchronous);
520
521        manager.set_strategy(ReplicationStrategy::Asynchronous);
522        assert_eq!(manager.get_strategy(), &ReplicationStrategy::Asynchronous);
523    }
524
525    #[test]
526    fn test_replication_manager_health_status() {
527        let mut manager =
528            ReplicationManager::new(ReplicationStrategy::SemiSynchronous { min_replicas: 2 }, 1);
529
530        // Add replicas
531        manager.add_replica(2, "127.0.0.1:8081".to_string());
532        manager.add_replica(3, "127.0.0.1:8082".to_string());
533        manager.add_replica(4, "127.0.0.1:8083".to_string());
534
535        // All healthy - should meet requirements
536        assert!(manager.is_replication_healthy());
537
538        // Mark one as unhealthy - should still meet requirements (2 out of 3)
539        manager.update_replica_health(4, false);
540        assert!(manager.is_replication_healthy());
541
542        // Mark another as unhealthy - should not meet requirements (1 out of 3)
543        manager.update_replica_health(3, false);
544        assert!(!manager.is_replication_healthy());
545    }
546
547    #[test]
548    fn test_replication_manager_required_replica_count() {
549        let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
550        manager.add_replica(2, "127.0.0.1:8081".to_string());
551        manager.add_replica(3, "127.0.0.1:8082".to_string());
552
553        // Synchronous requires all replicas
554        assert_eq!(manager.required_replica_count(), 2);
555
556        // Asynchronous requires no replicas
557        manager.set_strategy(ReplicationStrategy::Asynchronous);
558        assert_eq!(manager.required_replica_count(), 0);
559
560        // Semi-synchronous requires minimum specified
561        manager.set_strategy(ReplicationStrategy::SemiSynchronous { min_replicas: 1 });
562        assert_eq!(manager.required_replica_count(), 1);
563
564        // Raft consensus requires majority
565        manager.set_strategy(ReplicationStrategy::RaftConsensus);
566        // With 2 replicas + 1 local = 3 total, majority is 2, so we need 1 replica (since local is always available)
567        assert_eq!(manager.required_replica_count(), 1);
568    }
569
570    #[test]
571    fn test_replication_manager_raft_consensus_health() {
572        let mut manager = ReplicationManager::new(ReplicationStrategy::RaftConsensus, 1);
573
574        // Single node cluster (1 local + 0 replicas = 1 total) - should be healthy
575        assert!(manager.is_replication_healthy());
576
577        // Add replicas to form 3-node cluster
578        manager.add_replica(2, "127.0.0.1:8081".to_string());
579        manager.add_replica(3, "127.0.0.1:8082".to_string());
580
581        // All healthy (3 total, majority = 2, local + 2 replicas = 3) - should be healthy
582        assert!(manager.is_replication_healthy());
583
584        // Mark one replica as unhealthy (local + 1 replica = 2, still majority) - should be healthy
585        manager.update_replica_health(3, false);
586        assert!(manager.is_replication_healthy());
587
588        // Mark both replicas as unhealthy (only local = 1, not majority) - should not be healthy
589        manager.update_replica_health(2, false);
590        assert!(!manager.is_replication_healthy());
591    }
592
593    #[test]
594    fn test_replication_stats_default() {
595        let stats = ReplicationStats::default();
596        assert_eq!(stats.total_replicas, 0);
597        assert_eq!(stats.healthy_replicas, 0);
598        assert_eq!(stats.average_lag, 0.0);
599        assert_eq!(stats.max_lag, 0);
600        assert_eq!(stats.min_lag, 0);
601        assert_eq!(stats.average_latency, Duration::from_millis(0));
602        assert_eq!(stats.replication_throughput, 0.0);
603    }
604
605    #[test]
606    fn test_replication_error_display() {
607        let err = ReplicationError::InsufficientReplicas {
608            required: 3,
609            available: 1,
610        };
611        assert!(err
612            .to_string()
613            .contains("Insufficient replicas: need 3, have 1"));
614
615        let err = ReplicationError::UnhealthyReplica { node_id: 42 };
616        assert!(err.to_string().contains("Replica 42 is unhealthy"));
617
618        let err = ReplicationError::Timeout {
619            timeout: Duration::from_secs(5),
620        };
621        assert!(err.to_string().contains("Replication timeout after 5s"));
622
623        let err = ReplicationError::Network {
624            message: "connection failed".to_string(),
625        };
626        assert!(err.to_string().contains("Network error: connection failed"));
627
628        let err = ReplicationError::Serialization {
629            message: "json error".to_string(),
630        };
631        assert!(err.to_string().contains("Serialization error: json error"));
632    }
633}