1use crate::raft::OxirsNodeId;
7use std::collections::HashMap;
8use std::time::{Duration, SystemTime};
9use tokio::time::sleep;
10
11#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
13pub enum ReplicationStrategy {
14 Synchronous,
16 Asynchronous,
18 SemiSynchronous { min_replicas: usize },
20 RaftConsensus,
22}
23
24impl Default for ReplicationStrategy {
25 fn default() -> Self {
26 Self::RaftConsensus
27 }
28}
29
30#[derive(Debug, Clone)]
32pub struct ReplicaInfo {
33 pub node_id: OxirsNodeId,
35 pub address: String,
37 pub last_applied_index: u64,
39 pub is_healthy: bool,
41 pub last_contact: SystemTime,
43 pub replication_lag: u64,
45 pub latency: Duration,
47}
48
49impl ReplicaInfo {
50 pub fn new(node_id: OxirsNodeId, address: String) -> Self {
52 Self {
53 node_id,
54 address,
55 last_applied_index: 0,
56 is_healthy: true,
57 last_contact: SystemTime::now(),
58 replication_lag: 0,
59 latency: Duration::from_millis(0),
60 }
61 }
62
63 pub fn is_stale(&self, threshold: Duration) -> bool {
65 self.last_contact.elapsed().unwrap_or(Duration::MAX) > threshold
66 }
67
68 pub fn update_health(&mut self, is_healthy: bool) {
70 self.is_healthy = is_healthy;
71 if is_healthy {
72 self.last_contact = SystemTime::now();
73 }
74 }
75}
76
77#[derive(Debug, Clone, Default)]
79pub struct ReplicationStats {
80 pub total_replicas: usize,
81 pub healthy_replicas: usize,
82 pub average_lag: f64,
83 pub max_lag: u64,
84 pub min_lag: u64,
85 pub average_latency: Duration,
86 pub replication_throughput: f64, }
88
89#[derive(Debug)]
91pub struct ReplicationManager {
92 strategy: ReplicationStrategy,
93 replicas: HashMap<OxirsNodeId, ReplicaInfo>,
94 local_node_id: OxirsNodeId,
95 stats: ReplicationStats,
96}
97
98impl ReplicationManager {
99 pub fn new(strategy: ReplicationStrategy, local_node_id: OxirsNodeId) -> Self {
101 Self {
102 strategy,
103 replicas: HashMap::new(),
104 local_node_id,
105 stats: ReplicationStats::default(),
106 }
107 }
108
109 pub fn with_raft_consensus(local_node_id: OxirsNodeId) -> Self {
111 Self::new(ReplicationStrategy::RaftConsensus, local_node_id)
112 }
113
114 pub fn add_replica(&mut self, node_id: OxirsNodeId, address: String) -> bool {
116 if node_id == self.local_node_id {
117 tracing::warn!("Cannot add local node as replica");
118 return false;
119 }
120
121 let replica_info = ReplicaInfo::new(node_id, address.clone());
122 let is_new = !self.replicas.contains_key(&node_id);
123
124 self.replicas.insert(node_id, replica_info);
125
126 if is_new {
127 tracing::info!("Added replica {} at {}", node_id, address);
128 self.update_stats();
129 }
130
131 is_new
132 }
133
134 pub fn remove_replica(&mut self, node_id: OxirsNodeId) -> bool {
136 if let Some(replica) = self.replicas.remove(&node_id) {
137 tracing::info!("Removed replica {} at {}", node_id, replica.address);
138 self.update_stats();
139 true
140 } else {
141 false
142 }
143 }
144
145 pub fn get_replicas(&self) -> &HashMap<OxirsNodeId, ReplicaInfo> {
147 &self.replicas
148 }
149
150 pub fn get_healthy_replicas(&self) -> Vec<&ReplicaInfo> {
152 self.replicas
153 .values()
154 .filter(|replica| replica.is_healthy)
155 .collect()
156 }
157
158 pub fn get_replica(&self, node_id: OxirsNodeId) -> Option<&ReplicaInfo> {
160 self.replicas.get(&node_id)
161 }
162
163 pub fn update_replica_health(&mut self, node_id: OxirsNodeId, is_healthy: bool) -> bool {
165 if let Some(replica) = self.replicas.get_mut(&node_id) {
166 let was_healthy = replica.is_healthy;
167 replica.update_health(is_healthy);
168
169 if was_healthy != is_healthy {
170 tracing::info!(
171 "Replica {} health changed: {} -> {}",
172 node_id,
173 was_healthy,
174 is_healthy
175 );
176 self.update_stats();
177 }
178
179 true
180 } else {
181 false
182 }
183 }
184
185 pub fn update_replica_lag(
187 &mut self,
188 node_id: OxirsNodeId,
189 applied_index: u64,
190 current_index: u64,
191 ) {
192 if let Some(replica) = self.replicas.get_mut(&node_id) {
193 replica.last_applied_index = applied_index;
194 replica.replication_lag = current_index.saturating_sub(applied_index);
195 self.update_stats();
196 }
197 }
198
199 pub async fn health_check(&mut self, stale_threshold: Duration) {
201 let mut changed = false;
202
203 for replica in self.replicas.values_mut() {
204 let was_healthy = replica.is_healthy;
205
206 if replica.is_stale(stale_threshold) {
207 replica.is_healthy = false;
208 }
209
210 if was_healthy != replica.is_healthy {
211 changed = true;
212 tracing::warn!(
213 "Replica {} marked as unhealthy due to staleness",
214 replica.node_id
215 );
216 }
217 }
218
219 if changed {
220 self.update_stats();
221 }
222 }
223
224 pub fn get_strategy(&self) -> &ReplicationStrategy {
226 &self.strategy
227 }
228
229 pub fn set_strategy(&mut self, strategy: ReplicationStrategy) {
231 if self.strategy != strategy {
232 tracing::info!(
233 "Changing replication strategy from {:?} to {:?}",
234 self.strategy,
235 strategy
236 );
237 self.strategy = strategy;
238 }
239 }
240
241 pub fn get_stats(&self) -> &ReplicationStats {
243 &self.stats
244 }
245
246 pub fn is_replication_healthy(&self) -> bool {
248 let healthy_count = self.get_healthy_replicas().len();
249
250 match &self.strategy {
251 ReplicationStrategy::Synchronous => healthy_count == self.replicas.len(),
252 ReplicationStrategy::Asynchronous => true, ReplicationStrategy::SemiSynchronous { min_replicas } => healthy_count >= *min_replicas,
254 ReplicationStrategy::RaftConsensus => {
255 let total_nodes = self.replicas.len() + 1; let majority = (total_nodes / 2) + 1;
258 healthy_count + 1 >= majority }
260 }
261 }
262
263 pub fn required_replica_count(&self) -> usize {
265 match &self.strategy {
266 ReplicationStrategy::Synchronous => self.replicas.len(),
267 ReplicationStrategy::Asynchronous => 0,
268 ReplicationStrategy::SemiSynchronous { min_replicas } => *min_replicas,
269 ReplicationStrategy::RaftConsensus => {
270 let total_nodes = self.replicas.len() + 1;
271 (total_nodes / 2) + 1 - 1 }
273 }
274 }
275
276 fn update_stats(&mut self) {
278 let healthy_replicas_count = self.replicas.values().filter(|r| r.is_healthy).count();
279 let healthy_lags: Vec<u64> = self
280 .replicas
281 .values()
282 .filter(|r| r.is_healthy)
283 .map(|r| r.replication_lag)
284 .collect();
285 let healthy_latencies: Vec<Duration> = self
286 .replicas
287 .values()
288 .filter(|r| r.is_healthy)
289 .map(|r| r.latency)
290 .collect();
291
292 self.stats.total_replicas = self.replicas.len();
293 self.stats.healthy_replicas = healthy_replicas_count;
294
295 if !healthy_lags.is_empty() {
296 let total_lag: u64 = healthy_lags.iter().sum();
297 self.stats.average_lag = total_lag as f64 / healthy_lags.len() as f64;
298 self.stats.max_lag = healthy_lags.iter().copied().max().unwrap_or(0);
299 self.stats.min_lag = healthy_lags.iter().copied().min().unwrap_or(0);
300
301 let total_latency: Duration = healthy_latencies.iter().sum();
302 self.stats.average_latency = total_latency / healthy_latencies.len() as u32;
303 } else {
304 self.stats.average_lag = 0.0;
305 self.stats.max_lag = 0;
306 self.stats.min_lag = 0;
307 self.stats.average_latency = Duration::from_millis(0);
308 }
309 }
310
311 pub async fn run_maintenance(&mut self) {
313 const HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(30);
314 const STALE_THRESHOLD: Duration = Duration::from_secs(60);
315
316 loop {
317 sleep(HEALTH_CHECK_INTERVAL).await;
318
319 self.health_check(STALE_THRESHOLD).await;
320
321 if self.stats.total_replicas > 0 {
323 tracing::debug!(
324 "Replication stats: {}/{} healthy, avg lag: {:.1}, max lag: {}",
325 self.stats.healthy_replicas,
326 self.stats.total_replicas,
327 self.stats.average_lag,
328 self.stats.max_lag
329 );
330 }
331 }
332 }
333}
334
335#[derive(Debug, thiserror::Error)]
337pub enum ReplicationError {
338 #[error("Insufficient replicas: need {required}, have {available}")]
339 InsufficientReplicas { required: usize, available: usize },
340
341 #[error("Replica {node_id} is unhealthy")]
342 UnhealthyReplica { node_id: OxirsNodeId },
343
344 #[error("Replication timeout after {timeout:?}")]
345 Timeout { timeout: Duration },
346
347 #[error("Network error: {message}")]
348 Network { message: String },
349
350 #[error("Serialization error: {message}")]
351 Serialization { message: String },
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 #[test]
359 fn test_replication_strategy_default() {
360 let strategy = ReplicationStrategy::default();
361 assert_eq!(strategy, ReplicationStrategy::RaftConsensus);
362 }
363
364 #[test]
365 fn test_replica_info_creation() {
366 let replica = ReplicaInfo::new(1, "127.0.0.1:8080".to_string());
367
368 assert_eq!(replica.node_id, 1);
369 assert_eq!(replica.address, "127.0.0.1:8080");
370 assert_eq!(replica.last_applied_index, 0);
371 assert!(replica.is_healthy);
372 assert_eq!(replica.replication_lag, 0);
373 assert_eq!(replica.latency, Duration::from_millis(0));
374 }
375
376 #[test]
377 fn test_replica_info_staleness() {
378 let replica = ReplicaInfo::new(1, "127.0.0.1:8080".to_string());
379
380 assert!(!replica.is_stale(Duration::from_secs(10)));
382
383 std::thread::sleep(Duration::from_micros(1));
385
386 assert!(replica.is_stale(Duration::from_nanos(1)));
388 }
389
390 #[test]
391 fn test_replica_info_update_health() {
392 let mut replica = ReplicaInfo::new(1, "127.0.0.1:8080".to_string());
393
394 replica.update_health(false);
396 assert!(!replica.is_healthy);
397
398 replica.update_health(true);
400 assert!(replica.is_healthy);
401 }
402
403 #[test]
404 fn test_replication_manager_creation() {
405 let manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
406
407 assert_eq!(manager.strategy, ReplicationStrategy::Synchronous);
408 assert_eq!(manager.local_node_id, 1);
409 assert!(manager.replicas.is_empty());
410 assert_eq!(manager.stats.total_replicas, 0);
411 }
412
413 #[test]
414 fn test_replication_manager_with_raft_consensus() {
415 let manager = ReplicationManager::with_raft_consensus(1);
416
417 assert_eq!(manager.strategy, ReplicationStrategy::RaftConsensus);
418 assert_eq!(manager.local_node_id, 1);
419 }
420
421 #[test]
422 fn test_replication_manager_add_replica() {
423 let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
424
425 assert!(manager.add_replica(2, "127.0.0.1:8081".to_string()));
427 assert_eq!(manager.replicas.len(), 1);
428 assert!(manager.replicas.contains_key(&2));
429
430 assert!(!manager.add_replica(2, "127.0.0.1:8081".to_string()));
432 assert_eq!(manager.replicas.len(), 1);
433
434 assert!(!manager.add_replica(1, "127.0.0.1:8080".to_string()));
436 assert_eq!(manager.replicas.len(), 1);
437 }
438
439 #[test]
440 fn test_replication_manager_remove_replica() {
441 let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
442
443 manager.add_replica(2, "127.0.0.1:8081".to_string());
444 manager.add_replica(3, "127.0.0.1:8082".to_string());
445 assert_eq!(manager.replicas.len(), 2);
446
447 assert!(manager.remove_replica(2));
449 assert_eq!(manager.replicas.len(), 1);
450 assert!(!manager.replicas.contains_key(&2));
451
452 assert!(!manager.remove_replica(4));
454 assert_eq!(manager.replicas.len(), 1);
455 }
456
457 #[test]
458 fn test_replication_manager_get_healthy_replicas() {
459 let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
460
461 manager.add_replica(2, "127.0.0.1:8081".to_string());
462 manager.add_replica(3, "127.0.0.1:8082".to_string());
463
464 manager.update_replica_health(3, false);
466
467 let healthy_replicas = manager.get_healthy_replicas();
468 assert_eq!(healthy_replicas.len(), 1);
469 assert_eq!(healthy_replicas[0].node_id, 2);
470 }
471
472 #[test]
473 fn test_replication_manager_update_replica_health() {
474 let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
475
476 manager.add_replica(2, "127.0.0.1:8081".to_string());
477
478 assert!(manager.update_replica_health(2, false));
480 let replica = manager.get_replica(2).unwrap();
481 assert!(!replica.is_healthy);
482
483 assert!(!manager.update_replica_health(3, true));
485 }
486
487 #[test]
488 fn test_replication_manager_update_replica_lag() {
489 let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
490
491 manager.add_replica(2, "127.0.0.1:8081".to_string());
492
493 manager.update_replica_lag(2, 50, 100);
495 let replica = manager.get_replica(2).unwrap();
496 assert_eq!(replica.last_applied_index, 50);
497 assert_eq!(replica.replication_lag, 50);
498 }
499
500 #[tokio::test]
501 async fn test_replication_manager_health_check() {
502 let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
503
504 manager.add_replica(2, "127.0.0.1:8081".to_string());
505 manager.add_replica(3, "127.0.0.1:8082".to_string());
506
507 assert_eq!(manager.get_healthy_replicas().len(), 2);
509
510 manager.health_check(Duration::from_nanos(1)).await;
512 assert_eq!(manager.get_healthy_replicas().len(), 0);
513 }
514
515 #[test]
516 fn test_replication_manager_strategy_change() {
517 let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
518
519 assert_eq!(manager.get_strategy(), &ReplicationStrategy::Synchronous);
520
521 manager.set_strategy(ReplicationStrategy::Asynchronous);
522 assert_eq!(manager.get_strategy(), &ReplicationStrategy::Asynchronous);
523 }
524
525 #[test]
526 fn test_replication_manager_health_status() {
527 let mut manager =
528 ReplicationManager::new(ReplicationStrategy::SemiSynchronous { min_replicas: 2 }, 1);
529
530 manager.add_replica(2, "127.0.0.1:8081".to_string());
532 manager.add_replica(3, "127.0.0.1:8082".to_string());
533 manager.add_replica(4, "127.0.0.1:8083".to_string());
534
535 assert!(manager.is_replication_healthy());
537
538 manager.update_replica_health(4, false);
540 assert!(manager.is_replication_healthy());
541
542 manager.update_replica_health(3, false);
544 assert!(!manager.is_replication_healthy());
545 }
546
547 #[test]
548 fn test_replication_manager_required_replica_count() {
549 let mut manager = ReplicationManager::new(ReplicationStrategy::Synchronous, 1);
550 manager.add_replica(2, "127.0.0.1:8081".to_string());
551 manager.add_replica(3, "127.0.0.1:8082".to_string());
552
553 assert_eq!(manager.required_replica_count(), 2);
555
556 manager.set_strategy(ReplicationStrategy::Asynchronous);
558 assert_eq!(manager.required_replica_count(), 0);
559
560 manager.set_strategy(ReplicationStrategy::SemiSynchronous { min_replicas: 1 });
562 assert_eq!(manager.required_replica_count(), 1);
563
564 manager.set_strategy(ReplicationStrategy::RaftConsensus);
566 assert_eq!(manager.required_replica_count(), 1);
568 }
569
570 #[test]
571 fn test_replication_manager_raft_consensus_health() {
572 let mut manager = ReplicationManager::new(ReplicationStrategy::RaftConsensus, 1);
573
574 assert!(manager.is_replication_healthy());
576
577 manager.add_replica(2, "127.0.0.1:8081".to_string());
579 manager.add_replica(3, "127.0.0.1:8082".to_string());
580
581 assert!(manager.is_replication_healthy());
583
584 manager.update_replica_health(3, false);
586 assert!(manager.is_replication_healthy());
587
588 manager.update_replica_health(2, false);
590 assert!(!manager.is_replication_healthy());
591 }
592
593 #[test]
594 fn test_replication_stats_default() {
595 let stats = ReplicationStats::default();
596 assert_eq!(stats.total_replicas, 0);
597 assert_eq!(stats.healthy_replicas, 0);
598 assert_eq!(stats.average_lag, 0.0);
599 assert_eq!(stats.max_lag, 0);
600 assert_eq!(stats.min_lag, 0);
601 assert_eq!(stats.average_latency, Duration::from_millis(0));
602 assert_eq!(stats.replication_throughput, 0.0);
603 }
604
605 #[test]
606 fn test_replication_error_display() {
607 let err = ReplicationError::InsufficientReplicas {
608 required: 3,
609 available: 1,
610 };
611 assert!(err
612 .to_string()
613 .contains("Insufficient replicas: need 3, have 1"));
614
615 let err = ReplicationError::UnhealthyReplica { node_id: 42 };
616 assert!(err.to_string().contains("Replica 42 is unhealthy"));
617
618 let err = ReplicationError::Timeout {
619 timeout: Duration::from_secs(5),
620 };
621 assert!(err.to_string().contains("Replication timeout after 5s"));
622
623 let err = ReplicationError::Network {
624 message: "connection failed".to_string(),
625 };
626 assert!(err.to_string().contains("Network error: connection failed"));
627
628 let err = ReplicationError::Serialization {
629 message: "json error".to_string(),
630 };
631 assert!(err.to_string().contains("Serialization error: json error"));
632 }
633}