1use crate::error::{ClusterError, Result};
7use crate::worker_pool::WorkerId;
8use dashmap::DashMap;
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::time::Instant;
14use tracing::{debug, info, warn};
15
16#[derive(Clone)]
18pub struct ReplicationManager {
19 inner: Arc<ReplicationInner>,
20}
21
22struct ReplicationInner {
23 replicas: DashMap<String, ReplicaSet>,
25
26 worker_health: DashMap<WorkerId, WorkerHealth>,
28
29 config: ReplicationConfig,
31
32 stats: Arc<ReplicationStats>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ReplicationConfig {
39 pub replication_factor: usize,
41
42 pub min_replicas: usize,
44
45 pub max_replicas: usize,
47
48 pub read_quorum: usize,
50
51 pub write_quorum: usize,
53
54 pub placement_strategy: PlacementStrategy,
56
57 pub auto_rereplication: bool,
59
60 pub rereplication_interval: std::time::Duration,
62
63 pub rack_aware: bool,
65}
66
67impl Default for ReplicationConfig {
68 fn default() -> Self {
69 Self {
70 replication_factor: 3,
71 min_replicas: 2,
72 max_replicas: 5,
73 read_quorum: 2,
74 write_quorum: 2,
75 placement_strategy: PlacementStrategy::Random,
76 auto_rereplication: true,
77 rereplication_interval: std::time::Duration::from_secs(60),
78 rack_aware: false,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
85pub enum PlacementStrategy {
86 Random,
88
89 LeastLoaded,
91
92 RackAware,
94
95 ZoneAware,
97}
98
99#[derive(Debug, Clone)]
101pub struct ReplicaSet {
102 pub data_id: String,
104
105 pub replicas: Vec<Replica>,
107
108 pub primary: Option<WorkerId>,
110
111 pub version: u64,
113
114 pub created_at: Instant,
116
117 pub last_updated: Instant,
119
120 pub size_bytes: u64,
122}
123
124#[derive(Debug, Clone)]
126pub struct Replica {
127 pub worker_id: WorkerId,
129
130 pub status: ReplicaStatus,
132
133 pub version: u64,
135
136 pub created_at: Instant,
138
139 pub last_verified: Option<Instant>,
141
142 pub rack_id: Option<String>,
144
145 pub zone_id: Option<String>,
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
151pub enum ReplicaStatus {
152 Healthy,
154
155 Replicating,
157
158 Stale,
160
161 Unavailable,
163}
164
165#[derive(Debug, Clone)]
167pub struct WorkerHealth {
168 pub worker_id: WorkerId,
170
171 pub healthy: bool,
173
174 pub load: f64,
176
177 pub available_storage: u64,
179
180 pub rack_id: Option<String>,
182
183 pub zone_id: Option<String>,
185
186 pub last_updated: Instant,
188}
189
190#[derive(Debug, Default)]
192struct ReplicationStats {
193 replicas_created: AtomicU64,
195
196 replicas_removed: AtomicU64,
198
199 rereplications: AtomicU64,
201
202 quorum_reads: AtomicU64,
204
205 quorum_writes: AtomicU64,
207
208 quorum_failures: AtomicU64,
210}
211
212impl ReplicationManager {
213 pub fn new(config: ReplicationConfig) -> Self {
215 Self {
216 inner: Arc::new(ReplicationInner {
217 replicas: DashMap::new(),
218 worker_health: DashMap::new(),
219 config,
220 stats: Arc::new(ReplicationStats::default()),
221 }),
222 }
223 }
224
225 pub fn with_defaults() -> Self {
227 Self::new(ReplicationConfig::default())
228 }
229
230 pub fn create_replicas(
232 &self,
233 data_id: String,
234 size_bytes: u64,
235 available_workers: &[WorkerId],
236 ) -> Result<ReplicaSet> {
237 if available_workers.len() < self.inner.config.min_replicas {
238 return Err(ClusterError::ReplicaPlacementError(format!(
239 "Not enough workers: need {}, have {}",
240 self.inner.config.min_replicas,
241 available_workers.len()
242 )));
243 }
244
245 let selected_workers =
247 self.select_replica_workers(available_workers, self.inner.config.replication_factor)?;
248
249 let now = Instant::now();
250 let mut replicas = Vec::new();
251
252 for worker_id in selected_workers {
253 let health = self.inner.worker_health.get(&worker_id);
254
255 let replica = Replica {
256 worker_id,
257 status: ReplicaStatus::Healthy,
258 version: 1,
259 created_at: now,
260 last_verified: Some(now),
261 rack_id: health.as_ref().and_then(|h| h.rack_id.clone()),
262 zone_id: health.as_ref().and_then(|h| h.zone_id.clone()),
263 };
264
265 replicas.push(replica);
266
267 self.inner
268 .stats
269 .replicas_created
270 .fetch_add(1, Ordering::Relaxed);
271 }
272
273 let replica_set = ReplicaSet {
274 data_id: data_id.clone(),
275 replicas,
276 primary: None, version: 1,
278 created_at: now,
279 last_updated: now,
280 size_bytes,
281 };
282
283 self.inner.replicas.insert(data_id, replica_set.clone());
284
285 Ok(replica_set)
286 }
287
288 fn select_replica_workers(
290 &self,
291 available: &[WorkerId],
292 count: usize,
293 ) -> Result<Vec<WorkerId>> {
294 let count = count.min(available.len());
295
296 match self.inner.config.placement_strategy {
297 PlacementStrategy::Random => {
298 Ok(available.iter().take(count).copied().collect())
300 }
301 PlacementStrategy::LeastLoaded => {
302 let mut workers_with_load: Vec<_> = available
304 .iter()
305 .map(|&id| {
306 let load = self
307 .inner
308 .worker_health
309 .get(&id)
310 .map(|h| h.load)
311 .unwrap_or(1.0);
312 (id, load)
313 })
314 .collect();
315
316 workers_with_load
317 .sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
318
319 Ok(workers_with_load
320 .into_iter()
321 .take(count)
322 .map(|(id, _)| id)
323 .collect())
324 }
325 PlacementStrategy::RackAware => {
326 self.select_rack_aware(available, count)
328 }
329 PlacementStrategy::ZoneAware => {
330 self.select_zone_aware(available, count)
332 }
333 }
334 }
335
336 fn select_rack_aware(&self, available: &[WorkerId], count: usize) -> Result<Vec<WorkerId>> {
338 let mut selected = Vec::new();
339 let mut used_racks = HashSet::new();
340
341 for &worker_id in available {
343 if selected.len() >= count {
344 break;
345 }
346
347 if let Some(health) = self.inner.worker_health.get(&worker_id) {
348 if let Some(rack_id) = &health.rack_id {
349 if !used_racks.contains(rack_id) {
350 selected.push(worker_id);
351 used_racks.insert(rack_id.clone());
352 }
353 }
354 }
355 }
356
357 for &worker_id in available {
359 if selected.len() >= count {
360 break;
361 }
362 if !selected.contains(&worker_id) {
363 selected.push(worker_id);
364 }
365 }
366
367 Ok(selected)
368 }
369
370 fn select_zone_aware(&self, available: &[WorkerId], count: usize) -> Result<Vec<WorkerId>> {
372 let mut selected = Vec::new();
373 let mut used_zones = HashSet::new();
374
375 for &worker_id in available {
377 if selected.len() >= count {
378 break;
379 }
380
381 if let Some(health) = self.inner.worker_health.get(&worker_id) {
382 if let Some(zone_id) = &health.zone_id {
383 if !used_zones.contains(zone_id) {
384 selected.push(worker_id);
385 used_zones.insert(zone_id.clone());
386 }
387 }
388 }
389 }
390
391 for &worker_id in available {
393 if selected.len() >= count {
394 break;
395 }
396 if !selected.contains(&worker_id) {
397 selected.push(worker_id);
398 }
399 }
400
401 Ok(selected)
402 }
403
404 pub fn get_replicas(&self, data_id: &str) -> Option<ReplicaSet> {
406 self.inner.replicas.get(data_id).map(|r| r.clone())
407 }
408
409 pub async fn quorum_read(&self, data_id: &str) -> Result<Vec<WorkerId>> {
411 let replica_set = self
412 .get_replicas(data_id)
413 .ok_or_else(|| ClusterError::DataNotAvailable(data_id.to_string()))?;
414
415 let healthy_replicas: Vec<_> = replica_set
416 .replicas
417 .iter()
418 .filter(|r| r.status == ReplicaStatus::Healthy)
419 .map(|r| r.worker_id)
420 .collect();
421
422 if healthy_replicas.len() < self.inner.config.read_quorum {
423 self.inner
424 .stats
425 .quorum_failures
426 .fetch_add(1, Ordering::Relaxed);
427
428 return Err(ClusterError::QuorumNotReached {
429 required: self.inner.config.read_quorum,
430 actual: healthy_replicas.len(),
431 });
432 }
433
434 self.inner
435 .stats
436 .quorum_reads
437 .fetch_add(1, Ordering::Relaxed);
438
439 Ok(healthy_replicas
440 .into_iter()
441 .take(self.inner.config.read_quorum)
442 .collect())
443 }
444
445 pub async fn quorum_write(&self, data_id: &str) -> Result<Vec<WorkerId>> {
447 let replica_set = self
448 .get_replicas(data_id)
449 .ok_or_else(|| ClusterError::DataNotAvailable(data_id.to_string()))?;
450
451 let healthy_replicas: Vec<_> = replica_set
452 .replicas
453 .iter()
454 .filter(|r| r.status == ReplicaStatus::Healthy)
455 .map(|r| r.worker_id)
456 .collect();
457
458 if healthy_replicas.len() < self.inner.config.write_quorum {
459 self.inner
460 .stats
461 .quorum_failures
462 .fetch_add(1, Ordering::Relaxed);
463
464 return Err(ClusterError::QuorumNotReached {
465 required: self.inner.config.write_quorum,
466 actual: healthy_replicas.len(),
467 });
468 }
469
470 self.inner
471 .stats
472 .quorum_writes
473 .fetch_add(1, Ordering::Relaxed);
474
475 Ok(healthy_replicas
476 .into_iter()
477 .take(self.inner.config.write_quorum)
478 .collect())
479 }
480
481 pub fn update_worker_health(&self, health: WorkerHealth) {
483 self.inner.worker_health.insert(health.worker_id, health);
484 }
485
486 pub fn mark_replica_failed(&self, data_id: &str, worker_id: WorkerId) -> Result<()> {
488 if let Some(mut replica_set) = self.inner.replicas.get_mut(data_id) {
489 for replica in &mut replica_set.replicas {
490 if replica.worker_id == worker_id {
491 replica.status = ReplicaStatus::Unavailable;
492 warn!(
493 "Marked replica as unavailable: {} on {}",
494 data_id, worker_id
495 );
496 break;
497 }
498 }
499 }
500
501 Ok(())
502 }
503
504 pub fn check_rereplication(&self, available_workers: &[WorkerId]) -> Vec<(String, WorkerId)> {
506 if !self.inner.config.auto_rereplication {
507 return Vec::new();
508 }
509
510 let mut rereplications = Vec::new();
511
512 for entry in self.inner.replicas.iter() {
513 let data_id = entry.key().clone();
514 let replica_set = entry.value();
515
516 let healthy_count = replica_set
517 .replicas
518 .iter()
519 .filter(|r| r.status == ReplicaStatus::Healthy)
520 .count();
521
522 if healthy_count < self.inner.config.replication_factor {
523 let needed = self.inner.config.replication_factor - healthy_count;
525
526 let existing: HashSet<_> =
528 replica_set.replicas.iter().map(|r| r.worker_id).collect();
529
530 let candidates: Vec<_> = available_workers
531 .iter()
532 .filter(|w| !existing.contains(w))
533 .copied()
534 .collect();
535
536 match self.select_replica_workers(&candidates, needed) {
537 Ok(new_workers) => {
538 for worker_id in new_workers {
539 rereplications.push((data_id.clone(), worker_id));
540
541 self.inner
542 .stats
543 .rereplications
544 .fetch_add(1, Ordering::Relaxed);
545 }
546 }
547 Err(e) => {
548 warn!(
549 "Failed to select workers for re-replication of {}: {}",
550 data_id, e
551 );
552 }
553 }
554 }
555 }
556
557 if !rereplications.is_empty() {
558 info!("Scheduled {} re-replications", rereplications.len());
559 }
560
561 rereplications
562 }
563
564 pub fn add_replica(&self, data_id: &str, worker_id: WorkerId) -> Result<()> {
566 if let Some(mut replica_set) = self.inner.replicas.get_mut(data_id) {
567 let health = self.inner.worker_health.get(&worker_id);
568
569 let replica = Replica {
570 worker_id,
571 status: ReplicaStatus::Healthy,
572 version: replica_set.version,
573 created_at: Instant::now(),
574 last_verified: Some(Instant::now()),
575 rack_id: health.as_ref().and_then(|h| h.rack_id.clone()),
576 zone_id: health.as_ref().and_then(|h| h.zone_id.clone()),
577 };
578
579 replica_set.replicas.push(replica);
580 replica_set.last_updated = Instant::now();
581
582 self.inner
583 .stats
584 .replicas_created
585 .fetch_add(1, Ordering::Relaxed);
586
587 debug!("Added replica for {} on worker {}", data_id, worker_id);
588 }
589
590 Ok(())
591 }
592
593 pub fn remove_replica(&self, data_id: &str, worker_id: WorkerId) -> Result<()> {
595 if let Some(mut replica_set) = self.inner.replicas.get_mut(data_id) {
596 replica_set.replicas.retain(|r| r.worker_id != worker_id);
597 replica_set.last_updated = Instant::now();
598
599 self.inner
600 .stats
601 .replicas_removed
602 .fetch_add(1, Ordering::Relaxed);
603
604 debug!("Removed replica for {} from worker {}", data_id, worker_id);
605 }
606
607 Ok(())
608 }
609
610 pub fn get_statistics(&self) -> ReplicationStatistics {
612 ReplicationStatistics {
613 replicas_created: self.inner.stats.replicas_created.load(Ordering::Relaxed),
614 replicas_removed: self.inner.stats.replicas_removed.load(Ordering::Relaxed),
615 rereplications: self.inner.stats.rereplications.load(Ordering::Relaxed),
616 quorum_reads: self.inner.stats.quorum_reads.load(Ordering::Relaxed),
617 quorum_writes: self.inner.stats.quorum_writes.load(Ordering::Relaxed),
618 quorum_failures: self.inner.stats.quorum_failures.load(Ordering::Relaxed),
619 total_replica_sets: self.inner.replicas.len(),
620 total_workers: self.inner.worker_health.len(),
621 }
622 }
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
627pub struct ReplicationStatistics {
628 pub replicas_created: u64,
630
631 pub replicas_removed: u64,
633
634 pub rereplications: u64,
636
637 pub quorum_reads: u64,
639
640 pub quorum_writes: u64,
642
643 pub quorum_failures: u64,
645
646 pub total_replica_sets: usize,
648
649 pub total_workers: usize,
651}
652
653#[cfg(test)]
654#[allow(clippy::expect_used, clippy::unwrap_used)]
655mod tests {
656 use super::*;
657
658 #[test]
659 fn test_replication_manager_creation() {
660 let mgr = ReplicationManager::with_defaults();
661 let stats = mgr.get_statistics();
662 assert_eq!(stats.replicas_created, 0);
663 }
664
665 #[test]
666 fn test_create_replicas() {
667 let mgr = ReplicationManager::with_defaults();
668
669 let workers: Vec<_> = (0..5).map(|_| WorkerId::new()).collect();
670
671 let result = mgr.create_replicas("data1".to_string(), 1000, &workers);
672 assert!(result.is_ok());
673
674 if let Ok(replica_set) = result {
675 assert_eq!(replica_set.replicas.len(), 3); }
677 }
678
679 #[tokio::test]
680 async fn test_quorum_operations() {
681 let mgr = ReplicationManager::with_defaults();
682
683 let workers: Vec<_> = (0..5).map(|_| WorkerId::new()).collect();
684
685 mgr.create_replicas("data1".to_string(), 1000, &workers)
686 .ok();
687
688 let read_result = mgr.quorum_read("data1").await;
689 assert!(read_result.is_ok());
690
691 let write_result = mgr.quorum_write("data1").await;
692 assert!(write_result.is_ok());
693 }
694
695 #[test]
696 fn test_rereplication() {
697 let mgr = ReplicationManager::with_defaults();
698
699 let workers: Vec<_> = (0..5).map(|_| WorkerId::new()).collect();
700
701 mgr.create_replicas("data1".to_string(), 1000, &workers)
702 .ok();
703
704 mgr.mark_replica_failed("data1", workers[0]).ok();
706
707 let rereplications = mgr.check_rereplication(&workers);
709 assert!(!rereplications.is_empty());
710 }
711}