Skip to main content

oxirs_stream/
multi_region_replication.rs

1//! # Multi-Region Replication
2//!
3//! Advanced multi-region replication system for OxiRS Stream providing global data consistency,
4//! failover capabilities, and optimized cross-region communication.
5
6use crate::StreamEvent;
7use anyhow::{anyhow, Result};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::{Mutex, RwLock, Semaphore};
15use tokio::time;
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19/// Region configuration
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct RegionConfig {
22    /// Region identifier
23    pub region_id: String,
24    /// Region name (human-readable)
25    pub region_name: String,
26    /// Geographic location
27    pub location: GeographicLocation,
28    /// Network endpoints for this region
29    pub endpoints: Vec<RegionEndpoint>,
30    /// Replication priority (higher is more preferred)
31    pub priority: u8,
32    /// Whether this region is active for writes
33    pub is_write_active: bool,
34    /// Whether this region is active for reads
35    pub is_read_active: bool,
36    /// Replication mode for this region
37    pub replication_mode: ReplicationMode,
38    /// Network latency to other regions (ms)
39    pub latency_map: HashMap<String, u64>,
40}
41
42/// Geographic location information
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct GeographicLocation {
45    /// Country code (ISO 3166-1 alpha-2)
46    pub country: String,
47    /// Region/state/province
48    pub region: String,
49    /// City
50    pub city: String,
51    /// Latitude
52    pub latitude: f64,
53    /// Longitude
54    pub longitude: f64,
55    /// Availability zone (if applicable)
56    pub availability_zone: Option<String>,
57}
58
59/// Region endpoint configuration
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct RegionEndpoint {
62    /// Endpoint URL
63    pub url: String,
64    /// Endpoint type
65    pub endpoint_type: EndpointType,
66    /// Whether this endpoint is currently healthy
67    pub is_healthy: bool,
68    /// Last health check timestamp
69    pub last_health_check: Option<DateTime<Utc>>,
70    /// Authentication configuration
71    pub auth: Option<EndpointAuth>,
72}
73
74/// Endpoint type
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum EndpointType {
77    /// Primary streaming endpoint
78    Primary,
79    /// Secondary/backup endpoint
80    Secondary,
81    /// Administrative endpoint
82    Admin,
83    /// Health check endpoint
84    HealthCheck,
85}
86
87/// Endpoint authentication
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct EndpointAuth {
90    /// Authentication type
91    pub auth_type: String,
92    /// Credentials (encrypted)
93    pub credentials: HashMap<String, String>,
94}
95
96/// Replication mode
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum ReplicationMode {
99    /// Synchronous replication (wait for all regions)
100    Synchronous,
101    /// Asynchronous replication (fire and forget)
102    Asynchronous,
103    /// Semi-synchronous (wait for majority)
104    SemiSynchronous { min_replicas: usize },
105    /// Leader-follower (one primary region)
106    LeaderFollower { leader_region: String },
107    /// Active-active (all regions can write)
108    ActiveActive,
109}
110
111/// Replication configuration
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ReplicationConfig {
114    /// Replication strategy
115    pub strategy: ReplicationStrategy,
116    /// Conflict resolution strategy
117    pub conflict_resolution: ConflictResolution,
118    /// Maximum replication lag tolerance
119    pub max_lag_ms: u64,
120    /// Replication timeout
121    pub replication_timeout: Duration,
122    /// Enable compression for cross-region traffic
123    pub enable_compression: bool,
124    /// Batch size for replication
125    pub batch_size: usize,
126    /// Health check interval
127    pub health_check_interval: Duration,
128    /// Failover timeout
129    pub failover_timeout: Duration,
130}
131
132/// Replication strategy
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub enum ReplicationStrategy {
135    /// Replicate all events to all regions
136    FullReplication,
137    /// Replicate only specific event types
138    SelectiveReplication { event_types: HashSet<String> },
139    /// Partition-based replication
140    PartitionBased {
141        partition_strategy: PartitionStrategy,
142    },
143    /// Geography-based replication
144    GeographyBased {
145        region_groups: HashMap<String, Vec<String>>,
146    },
147}
148
149/// Partition strategy for replication
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub enum PartitionStrategy {
152    /// Hash-based partitioning
153    Hash { hash_key: String },
154    /// Range-based partitioning
155    Range { ranges: Vec<PartitionRange> },
156    /// Custom partitioning logic
157    Custom { strategy_name: String },
158}
159
160/// Partition range definition
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct PartitionRange {
163    pub start: String,
164    pub end: String,
165    pub regions: Vec<String>,
166}
167
168/// Conflict resolution strategy
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub enum ConflictResolution {
171    /// Last write wins (timestamp-based)
172    LastWriteWins,
173    /// First write wins
174    FirstWriteWins,
175    /// Region priority based
176    RegionPriority { priority_order: Vec<String> },
177    /// Custom conflict resolution
178    Custom { resolver_name: String },
179    /// Manual resolution (queue conflicts)
180    Manual,
181}
182
183/// Replicated event with replication metadata
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct ReplicatedEvent {
186    /// Original event
187    pub event: StreamEvent,
188    /// Replication metadata
189    pub replication_metadata: ReplicationMetadata,
190}
191
192/// Replication metadata
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ReplicationMetadata {
195    /// Unique replication ID
196    pub replication_id: Uuid,
197    /// Source region
198    pub source_region: String,
199    /// Target regions
200    pub target_regions: Vec<String>,
201    /// Replication timestamp
202    pub replication_timestamp: DateTime<Utc>,
203    /// Replication status per region
204    pub region_status: HashMap<String, ReplicationStatus>,
205    /// Vector clock for ordering
206    pub vector_clock: VectorClock,
207    /// Conflict resolution information
208    pub conflict_info: Option<ConflictInfo>,
209}
210
211/// Replication status for a region
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub enum ReplicationStatus {
214    /// Pending replication
215    Pending,
216    /// Successfully replicated
217    Success { timestamp: DateTime<Utc> },
218    /// Replication failed
219    Failed {
220        error: String,
221        timestamp: DateTime<Utc>,
222    },
223    /// Replication in progress
224    InProgress { started_at: DateTime<Utc> },
225}
226
227/// Vector clock for event ordering
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct VectorClock {
230    /// Clock values per region
231    pub clocks: HashMap<String, u64>,
232}
233
234impl Default for VectorClock {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240impl VectorClock {
241    /// Create a new vector clock
242    pub fn new() -> Self {
243        Self {
244            clocks: HashMap::new(),
245        }
246    }
247
248    /// Increment clock for a region
249    pub fn increment(&mut self, region: &str) {
250        let current = self.clocks.get(region).unwrap_or(&0);
251        self.clocks.insert(region.to_string(), current + 1);
252    }
253
254    /// Update clock with another vector clock
255    pub fn update(&mut self, other: &VectorClock) {
256        for (region, other_clock) in &other.clocks {
257            let current = self.clocks.get(region).unwrap_or(&0);
258            self.clocks
259                .insert(region.clone(), (*current).max(*other_clock));
260        }
261    }
262
263    /// Check if this clock happens before another
264    pub fn happens_before(&self, other: &VectorClock) -> bool {
265        let mut strictly_less = false;
266
267        for region in self.clocks.keys().chain(other.clocks.keys()) {
268            let self_clock = self.clocks.get(region).unwrap_or(&0);
269            let other_clock = other.clocks.get(region).unwrap_or(&0);
270
271            if self_clock > other_clock {
272                return false; // Not happens-before
273            } else if self_clock < other_clock {
274                strictly_less = true;
275            }
276        }
277
278        strictly_less
279    }
280
281    /// Check if clocks are concurrent (neither happens before the other)
282    pub fn is_concurrent(&self, other: &VectorClock) -> bool {
283        !self.happens_before(other) && !other.happens_before(self)
284    }
285}
286
287/// Conflict information for resolution
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct ConflictInfo {
290    /// Conflict type
291    pub conflict_type: ConflictType,
292    /// Conflicting events
293    pub conflicting_events: Vec<StreamEvent>,
294    /// Resolution strategy used
295    pub resolution_strategy: ConflictResolution,
296    /// Resolution timestamp
297    pub resolved_at: Option<DateTime<Utc>>,
298    /// Resolution result
299    pub resolution_result: Option<StreamEvent>,
300}
301
302/// Type of conflict detected
303#[derive(Debug, Clone, Serialize, Deserialize)]
304pub enum ConflictType {
305    /// Write-write conflict (concurrent writes to same resource)
306    WriteWrite,
307    /// Write-read conflict
308    WriteRead,
309    /// Schema conflict
310    Schema,
311    /// Ordering conflict
312    Ordering,
313}
314
315/// Multi-region replication manager
316pub struct MultiRegionReplicationManager {
317    /// Replication configuration
318    config: ReplicationConfig,
319    /// Configured regions
320    regions: Arc<RwLock<HashMap<String, RegionConfig>>>,
321    /// Current region ID
322    current_region: String,
323    /// Replication statistics
324    stats: Arc<ReplicationStats>,
325    /// Conflict resolution queue
326    conflict_queue: Arc<Mutex<VecDeque<ConflictInfo>>>,
327    /// Vector clock for this region
328    vector_clock: Arc<Mutex<VectorClock>>,
329    /// Health monitoring
330    health_monitor: Arc<RegionHealthMonitor>,
331    /// Replication semaphore
332    replication_semaphore: Arc<Semaphore>,
333}
334
335/// Replication statistics
336#[derive(Debug, Default)]
337pub struct ReplicationStats {
338    pub total_events_replicated: AtomicU64,
339    pub successful_replications: AtomicU64,
340    pub failed_replications: AtomicU64,
341    pub conflicts_detected: AtomicU64,
342    pub conflicts_resolved: AtomicU64,
343    pub average_replication_latency_ms: AtomicU64,
344    pub cross_region_bandwidth_bytes: AtomicU64,
345    pub region_failures: AtomicU64,
346    pub failover_events: AtomicU64,
347}
348
349/// Region health monitor
350pub struct RegionHealthMonitor {
351    /// Health status per region
352    health_status: Arc<RwLock<HashMap<String, RegionHealth>>>,
353    /// Health check interval
354    check_interval: Duration,
355    /// Statistics
356    stats: Arc<HealthStats>,
357}
358
359/// Health status for a region
360#[derive(Debug, Clone)]
361pub struct RegionHealth {
362    /// Whether region is healthy
363    pub is_healthy: bool,
364    /// Last successful health check
365    pub last_success: Option<DateTime<Utc>>,
366    /// Last health check attempt
367    pub last_check: DateTime<Utc>,
368    /// Current latency to region
369    pub latency_ms: Option<u64>,
370    /// Error count in recent window
371    pub recent_errors: u32,
372    /// Health score (0.0 to 1.0)
373    pub health_score: f64,
374}
375
376/// Health monitoring statistics
377#[derive(Debug, Default)]
378pub struct HealthStats {
379    pub total_health_checks: AtomicU64,
380    pub failed_health_checks: AtomicU64,
381    pub average_latency_ms: AtomicU64,
382    pub regions_down: AtomicU64,
383}
384
385impl MultiRegionReplicationManager {
386    /// Create a new multi-region replication manager
387    pub fn new(config: ReplicationConfig, current_region: String) -> Self {
388        let health_monitor = Arc::new(RegionHealthMonitor::new(config.health_check_interval));
389
390        Self {
391            config,
392            current_region,
393            regions: Arc::new(RwLock::new(HashMap::new())),
394            stats: Arc::new(ReplicationStats::default()),
395            conflict_queue: Arc::new(Mutex::new(VecDeque::new())),
396            vector_clock: Arc::new(Mutex::new(VectorClock::new())),
397            health_monitor,
398            replication_semaphore: Arc::new(Semaphore::new(100)), // Max 100 concurrent replications
399        }
400    }
401
402    /// Add a region to the replication topology
403    pub async fn add_region(&self, region_config: RegionConfig) -> Result<()> {
404        let region_id = region_config.region_id.clone();
405        let mut regions = self.regions.write().await;
406        regions.insert(region_id.clone(), region_config);
407
408        // Initialize health monitoring for this region
409        self.health_monitor.add_region(region_id.clone()).await;
410
411        info!("Added region {} to replication topology", region_id);
412        Ok(())
413    }
414
415    /// Remove a region from the replication topology
416    pub async fn remove_region(&self, region_id: &str) -> Result<()> {
417        let mut regions = self.regions.write().await;
418        if regions.remove(region_id).is_some() {
419            self.health_monitor.remove_region(region_id).await;
420            info!("Removed region {} from replication topology", region_id);
421            Ok(())
422        } else {
423            Err(anyhow!("Region {} not found", region_id))
424        }
425    }
426
427    /// Replicate an event to other regions
428    pub async fn replicate_event(&self, event: StreamEvent) -> Result<ReplicatedEvent> {
429        let _permit = self.replication_semaphore.acquire().await?;
430        let start_time = Instant::now();
431
432        // Generate replication metadata
433        let mut vector_clock = self.vector_clock.lock().await;
434        vector_clock.increment(&self.current_region);
435        let replication_metadata = ReplicationMetadata {
436            replication_id: Uuid::new_v4(),
437            source_region: self.current_region.clone(),
438            target_regions: self.get_target_regions(&event).await?,
439            replication_timestamp: Utc::now(),
440            region_status: HashMap::new(),
441            vector_clock: vector_clock.clone(),
442            conflict_info: None,
443        };
444        drop(vector_clock);
445
446        let replicated_event = ReplicatedEvent {
447            event,
448            replication_metadata,
449        };
450
451        // Perform replication based on strategy
452        match self.config.strategy {
453            ReplicationStrategy::FullReplication => {
454                self.replicate_to_all_regions(&replicated_event).await?;
455            }
456            ReplicationStrategy::SelectiveReplication { ref event_types } => {
457                if self.should_replicate_event(&replicated_event.event, event_types) {
458                    self.replicate_to_all_regions(&replicated_event).await?;
459                }
460            }
461            ReplicationStrategy::PartitionBased {
462                ref partition_strategy,
463            } => {
464                self.replicate_partitioned(&replicated_event, partition_strategy)
465                    .await?;
466            }
467            ReplicationStrategy::GeographyBased { ref region_groups } => {
468                self.replicate_by_geography(&replicated_event, region_groups)
469                    .await?;
470            }
471        }
472
473        // Record statistics
474        let replication_latency = start_time.elapsed();
475        self.stats
476            .total_events_replicated
477            .fetch_add(1, Ordering::Relaxed);
478        self.stats
479            .average_replication_latency_ms
480            .store(replication_latency.as_millis() as u64, Ordering::Relaxed);
481
482        info!(
483            "Replicated event {} to {} regions in {:?}",
484            replicated_event.replication_metadata.replication_id,
485            replicated_event.replication_metadata.target_regions.len(),
486            replication_latency
487        );
488
489        Ok(replicated_event)
490    }
491
492    /// Handle an incoming replicated event
493    pub async fn handle_replicated_event(&self, replicated_event: ReplicatedEvent) -> Result<()> {
494        // Check for conflicts
495        if let Some(conflict) = self.detect_conflict(&replicated_event).await? {
496            self.handle_conflict(conflict).await?;
497            return Ok(());
498        }
499
500        // Update vector clock
501        let mut vector_clock = self.vector_clock.lock().await;
502        vector_clock.update(&replicated_event.replication_metadata.vector_clock);
503        drop(vector_clock);
504
505        // Process the event locally
506        self.process_replicated_event(replicated_event).await?;
507
508        Ok(())
509    }
510
511    /// Detect conflicts in replicated events
512    async fn detect_conflict(
513        &self,
514        replicated_event: &ReplicatedEvent,
515    ) -> Result<Option<ConflictInfo>> {
516        // Simple conflict detection based on vector clocks
517        let vector_clock = self.vector_clock.lock().await;
518
519        if vector_clock.is_concurrent(&replicated_event.replication_metadata.vector_clock) {
520            // Potential conflict detected
521            self.stats
522                .conflicts_detected
523                .fetch_add(1, Ordering::Relaxed);
524
525            let conflict_info = ConflictInfo {
526                conflict_type: ConflictType::WriteWrite,
527                conflicting_events: vec![replicated_event.event.clone()],
528                resolution_strategy: self.config.conflict_resolution.clone(),
529                resolved_at: None,
530                resolution_result: None,
531            };
532
533            warn!(
534                "Conflict detected for event {}",
535                replicated_event.replication_metadata.replication_id
536            );
537            return Ok(Some(conflict_info));
538        }
539
540        Ok(None)
541    }
542
543    /// Handle a detected conflict
544    async fn handle_conflict(&self, mut conflict_info: ConflictInfo) -> Result<()> {
545        match &self.config.conflict_resolution {
546            ConflictResolution::LastWriteWins => {
547                // Resolve by timestamp
548                conflict_info.resolution_result = Some(
549                    conflict_info
550                        .conflicting_events
551                        .iter()
552                        .max_by_key(|e| e.metadata().timestamp)
553                        .expect("conflicting_events should not be empty")
554                        .clone(),
555                );
556                conflict_info.resolved_at = Some(Utc::now());
557                self.stats
558                    .conflicts_resolved
559                    .fetch_add(1, Ordering::Relaxed);
560            }
561            ConflictResolution::Manual => {
562                // Queue for manual resolution
563                let mut queue = self.conflict_queue.lock().await;
564                queue.push_back(conflict_info);
565            }
566            _ => {
567                warn!(
568                    "Conflict resolution strategy not implemented: {:?}",
569                    self.config.conflict_resolution
570                );
571            }
572        }
573
574        Ok(())
575    }
576
577    /// Get target regions for an event based on strategy
578    async fn get_target_regions(&self, _event: &StreamEvent) -> Result<Vec<String>> {
579        let regions = self.regions.read().await;
580        let healthy_regions = self.health_monitor.get_healthy_regions().await;
581
582        Ok(regions
583            .keys()
584            .filter(|region_id| {
585                **region_id != self.current_region && healthy_regions.contains(*region_id)
586            })
587            .cloned()
588            .collect())
589    }
590
591    /// Replicate to all available regions
592    async fn replicate_to_all_regions(&self, replicated_event: &ReplicatedEvent) -> Result<()> {
593        let regions = self.regions.read().await;
594        let mut replication_tasks = Vec::new();
595
596        for region_id in &replicated_event.replication_metadata.target_regions {
597            if let Some(region_config) = regions.get(region_id) {
598                let event_clone = replicated_event.clone();
599                let region_config_clone = region_config.clone();
600                let region_id_clone = region_id.clone();
601                let stats = self.stats.clone();
602
603                let task = tokio::spawn(async move {
604                    match Self::send_to_region(event_clone, region_config_clone).await {
605                        Ok(_) => {
606                            stats
607                                .successful_replications
608                                .fetch_add(1, Ordering::Relaxed);
609                        }
610                        Err(e) => {
611                            stats.failed_replications.fetch_add(1, Ordering::Relaxed);
612                            error!("Failed to replicate to region {}: {}", region_id_clone, e);
613                        }
614                    }
615                });
616
617                replication_tasks.push(task);
618            }
619        }
620
621        // Wait for replication based on mode
622        // Wait for all replications (can be optimized based on replication mode)
623        for task in replication_tasks {
624            let _ = task.await;
625        }
626
627        Ok(())
628    }
629
630    /// Send event to a specific region
631    async fn send_to_region(
632        _replicated_event: ReplicatedEvent,
633        _region_config: RegionConfig,
634    ) -> Result<()> {
635        // Simulate network call - in real implementation, this would use HTTP/gRPC
636        time::sleep(Duration::from_millis(50)).await;
637
638        // Simulate occasional failures
639        if fastrand::f32() < 0.05 {
640            // 5% failure rate
641            return Err(anyhow!("Simulated network failure"));
642        }
643
644        Ok(())
645    }
646
647    /// Check if an event should be replicated based on selective replication rules
648    fn should_replicate_event(&self, event: &StreamEvent, event_types: &HashSet<String>) -> bool {
649        let event_type = format!("{:?}", std::mem::discriminant(event));
650        event_types.contains(&event_type)
651    }
652
653    /// Replicate using partition-based strategy
654    async fn replicate_partitioned(
655        &self,
656        _replicated_event: &ReplicatedEvent,
657        _partition_strategy: &PartitionStrategy,
658    ) -> Result<()> {
659        // Implementation for partition-based replication
660        // This would determine which regions to replicate to based on partitioning
661        Ok(())
662    }
663
664    /// Replicate using geography-based strategy
665    async fn replicate_by_geography(
666        &self,
667        _replicated_event: &ReplicatedEvent,
668        _region_groups: &HashMap<String, Vec<String>>,
669    ) -> Result<()> {
670        // Implementation for geography-based replication
671        // This would replicate to regions in the same geographic group
672        Ok(())
673    }
674
675    /// Process a replicated event locally
676    async fn process_replicated_event(&self, _replicated_event: ReplicatedEvent) -> Result<()> {
677        // Implementation for processing replicated events locally
678        // This would typically integrate with the local storage system
679        Ok(())
680    }
681
682    /// Get replication statistics
683    pub fn get_stats(&self) -> ReplicationStats {
684        ReplicationStats {
685            total_events_replicated: AtomicU64::new(
686                self.stats.total_events_replicated.load(Ordering::Relaxed),
687            ),
688            successful_replications: AtomicU64::new(
689                self.stats.successful_replications.load(Ordering::Relaxed),
690            ),
691            failed_replications: AtomicU64::new(
692                self.stats.failed_replications.load(Ordering::Relaxed),
693            ),
694            conflicts_detected: AtomicU64::new(
695                self.stats.conflicts_detected.load(Ordering::Relaxed),
696            ),
697            conflicts_resolved: AtomicU64::new(
698                self.stats.conflicts_resolved.load(Ordering::Relaxed),
699            ),
700            average_replication_latency_ms: AtomicU64::new(
701                self.stats
702                    .average_replication_latency_ms
703                    .load(Ordering::Relaxed),
704            ),
705            cross_region_bandwidth_bytes: AtomicU64::new(
706                self.stats
707                    .cross_region_bandwidth_bytes
708                    .load(Ordering::Relaxed),
709            ),
710            region_failures: AtomicU64::new(self.stats.region_failures.load(Ordering::Relaxed)),
711            failover_events: AtomicU64::new(self.stats.failover_events.load(Ordering::Relaxed)),
712        }
713    }
714
715    /// Get conflicts waiting for manual resolution
716    pub async fn get_pending_conflicts(&self) -> Vec<ConflictInfo> {
717        let queue = self.conflict_queue.lock().await;
718        queue.iter().cloned().collect()
719    }
720}
721
722impl RegionHealthMonitor {
723    /// Create a new region health monitor
724    pub fn new(check_interval: Duration) -> Self {
725        Self {
726            health_status: Arc::new(RwLock::new(HashMap::new())),
727            check_interval,
728            stats: Arc::new(HealthStats::default()),
729        }
730    }
731
732    /// Add a region to monitor
733    pub async fn add_region(&self, region_id: String) {
734        let mut health_status = self.health_status.write().await;
735        health_status.insert(
736            region_id,
737            RegionHealth {
738                is_healthy: true,
739                last_success: None,
740                last_check: Utc::now(),
741                latency_ms: None,
742                recent_errors: 0,
743                health_score: 1.0,
744            },
745        );
746    }
747
748    /// Remove a region from monitoring
749    pub async fn remove_region(&self, region_id: &str) {
750        let mut health_status = self.health_status.write().await;
751        health_status.remove(region_id);
752    }
753
754    /// Get list of healthy regions
755    pub async fn get_healthy_regions(&self) -> Vec<String> {
756        let health_status = self.health_status.read().await;
757        health_status
758            .iter()
759            .filter(|(_, health)| health.is_healthy)
760            .map(|(region_id, _)| region_id.clone())
761            .collect()
762    }
763
764    /// Perform health check for all regions
765    pub async fn check_all_regions(&self) -> Result<()> {
766        let regions: Vec<String> = {
767            let health_status = self.health_status.read().await;
768            health_status.keys().cloned().collect()
769        };
770
771        for region_id in regions {
772            self.check_region_health(&region_id).await?;
773        }
774
775        Ok(())
776    }
777
778    /// Check health of a specific region
779    async fn check_region_health(&self, region_id: &str) -> Result<()> {
780        let start_time = Instant::now();
781        self.stats
782            .total_health_checks
783            .fetch_add(1, Ordering::Relaxed);
784
785        // Simulate health check - in real implementation, this would ping the region
786        let is_healthy = fastrand::f32() > 0.1; // 90% success rate
787        let latency = start_time.elapsed();
788
789        let mut health_status = self.health_status.write().await;
790        if let Some(health) = health_status.get_mut(region_id) {
791            health.last_check = Utc::now();
792            health.latency_ms = Some(latency.as_millis() as u64);
793
794            if is_healthy {
795                health.is_healthy = true;
796                health.last_success = Some(Utc::now());
797                health.recent_errors = 0;
798                health.health_score = (health.health_score + 0.1).min(1.0);
799            } else {
800                health.recent_errors += 1;
801                health.health_score = (health.health_score - 0.2).max(0.0);
802
803                if health.recent_errors > 3 {
804                    health.is_healthy = false;
805                    self.stats
806                        .failed_health_checks
807                        .fetch_add(1, Ordering::Relaxed);
808                }
809            }
810        }
811
812        Ok(())
813    }
814}
815
816#[cfg(test)]
817mod tests {
818    use super::*;
819    use crate::event::EventMetadata;
820    use std::collections::HashMap;
821
822    fn create_test_region(id: &str) -> RegionConfig {
823        RegionConfig {
824            region_id: id.to_string(),
825            region_name: format!("Region {id}"),
826            location: GeographicLocation {
827                country: "US".to_string(),
828                region: "California".to_string(),
829                city: "San Francisco".to_string(),
830                latitude: 37.7749,
831                longitude: -122.4194,
832                availability_zone: Some("us-west-1a".to_string()),
833            },
834            endpoints: vec![RegionEndpoint {
835                url: format!("https://{id}.example.com"),
836                endpoint_type: EndpointType::Primary,
837                is_healthy: true,
838                last_health_check: Some(Utc::now()),
839                auth: None,
840            }],
841            priority: 1,
842            is_write_active: true,
843            is_read_active: true,
844            replication_mode: ReplicationMode::Asynchronous,
845            latency_map: HashMap::new(),
846        }
847    }
848
849    fn create_test_event() -> StreamEvent {
850        StreamEvent::TripleAdded {
851            subject: "http://test.org/subject".to_string(),
852            predicate: "http://test.org/predicate".to_string(),
853            object: "\"test_value\"".to_string(),
854            graph: None,
855            metadata: EventMetadata {
856                event_id: Uuid::new_v4().to_string(),
857                timestamp: Utc::now(),
858                source: "test".to_string(),
859                user: None,
860                context: None,
861                caused_by: None,
862                version: "1.0".to_string(),
863                properties: HashMap::new(),
864                checksum: None,
865            },
866        }
867    }
868
869    #[tokio::test]
870    async fn test_replication_manager_creation() {
871        let config = ReplicationConfig {
872            strategy: ReplicationStrategy::FullReplication,
873            conflict_resolution: ConflictResolution::LastWriteWins,
874            max_lag_ms: 1000,
875            replication_timeout: Duration::from_secs(30),
876            enable_compression: true,
877            batch_size: 100,
878            health_check_interval: Duration::from_secs(60),
879            failover_timeout: Duration::from_secs(300),
880        };
881
882        let manager = MultiRegionReplicationManager::new(config, "us-west-1".to_string());
883        assert_eq!(manager.current_region, "us-west-1");
884    }
885
886    #[tokio::test]
887    async fn test_region_management() {
888        let config = ReplicationConfig {
889            strategy: ReplicationStrategy::FullReplication,
890            conflict_resolution: ConflictResolution::LastWriteWins,
891            max_lag_ms: 1000,
892            replication_timeout: Duration::from_secs(30),
893            enable_compression: true,
894            batch_size: 100,
895            health_check_interval: Duration::from_secs(60),
896            failover_timeout: Duration::from_secs(300),
897        };
898
899        let manager = MultiRegionReplicationManager::new(config, "us-west-1".to_string());
900
901        // Add regions
902        manager
903            .add_region(create_test_region("us-east-1"))
904            .await
905            .unwrap();
906        manager
907            .add_region(create_test_region("eu-west-1"))
908            .await
909            .unwrap();
910
911        let regions = manager.regions.read().await;
912        assert_eq!(regions.len(), 2);
913        assert!(regions.contains_key("us-east-1"));
914        assert!(regions.contains_key("eu-west-1"));
915    }
916
917    #[tokio::test]
918    async fn test_vector_clock() {
919        let mut clock1 = VectorClock::new();
920        let mut clock2 = VectorClock::new();
921
922        clock1.increment("region1");
923        clock2.increment("region2");
924
925        assert!(clock1.is_concurrent(&clock2));
926        assert!(!clock1.happens_before(&clock2));
927
928        clock1.update(&clock2);
929        clock1.increment("region1");
930
931        assert!(clock2.happens_before(&clock1));
932        assert!(!clock1.happens_before(&clock2));
933    }
934
935    #[tokio::test]
936    async fn test_health_monitor() {
937        let monitor = RegionHealthMonitor::new(Duration::from_secs(60));
938
939        monitor.add_region("us-west-1".to_string()).await;
940        monitor.add_region("us-east-1".to_string()).await;
941
942        let healthy_regions = monitor.get_healthy_regions().await;
943        assert_eq!(healthy_regions.len(), 2);
944
945        monitor.check_all_regions().await.unwrap();
946
947        let stats = &monitor.stats;
948        assert!(stats.total_health_checks.load(Ordering::Relaxed) >= 2);
949    }
950
951    #[test]
952    fn test_replication_config() {
953        let config = ReplicationConfig {
954            strategy: ReplicationStrategy::SelectiveReplication {
955                event_types: ["TripleAdded", "TripleRemoved"]
956                    .iter()
957                    .map(|s| s.to_string())
958                    .collect(),
959            },
960            conflict_resolution: ConflictResolution::RegionPriority {
961                priority_order: vec!["us-west-1".to_string(), "us-east-1".to_string()],
962            },
963            max_lag_ms: 500,
964            replication_timeout: Duration::from_secs(15),
965            enable_compression: false,
966            batch_size: 50,
967            health_check_interval: Duration::from_secs(30),
968            failover_timeout: Duration::from_secs(120),
969        };
970
971        match config.strategy {
972            ReplicationStrategy::SelectiveReplication { ref event_types } => {
973                assert_eq!(event_types.len(), 2);
974            }
975            _ => panic!("Wrong strategy type"),
976        }
977    }
978}