1use crate::StreamEvent;
7use anyhow::{anyhow, Result};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::{Mutex, RwLock, Semaphore};
15use tokio::time;
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct RegionConfig {
22 pub region_id: String,
24 pub region_name: String,
26 pub location: GeographicLocation,
28 pub endpoints: Vec<RegionEndpoint>,
30 pub priority: u8,
32 pub is_write_active: bool,
34 pub is_read_active: bool,
36 pub replication_mode: ReplicationMode,
38 pub latency_map: HashMap<String, u64>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct GeographicLocation {
45 pub country: String,
47 pub region: String,
49 pub city: String,
51 pub latitude: f64,
53 pub longitude: f64,
55 pub availability_zone: Option<String>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct RegionEndpoint {
62 pub url: String,
64 pub endpoint_type: EndpointType,
66 pub is_healthy: bool,
68 pub last_health_check: Option<DateTime<Utc>>,
70 pub auth: Option<EndpointAuth>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum EndpointType {
77 Primary,
79 Secondary,
81 Admin,
83 HealthCheck,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct EndpointAuth {
90 pub auth_type: String,
92 pub credentials: HashMap<String, String>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum ReplicationMode {
99 Synchronous,
101 Asynchronous,
103 SemiSynchronous { min_replicas: usize },
105 LeaderFollower { leader_region: String },
107 ActiveActive,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ReplicationConfig {
114 pub strategy: ReplicationStrategy,
116 pub conflict_resolution: ConflictResolution,
118 pub max_lag_ms: u64,
120 pub replication_timeout: Duration,
122 pub enable_compression: bool,
124 pub batch_size: usize,
126 pub health_check_interval: Duration,
128 pub failover_timeout: Duration,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub enum ReplicationStrategy {
135 FullReplication,
137 SelectiveReplication { event_types: HashSet<String> },
139 PartitionBased {
141 partition_strategy: PartitionStrategy,
142 },
143 GeographyBased {
145 region_groups: HashMap<String, Vec<String>>,
146 },
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub enum PartitionStrategy {
152 Hash { hash_key: String },
154 Range { ranges: Vec<PartitionRange> },
156 Custom { strategy_name: String },
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct PartitionRange {
163 pub start: String,
164 pub end: String,
165 pub regions: Vec<String>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
170pub enum ConflictResolution {
171 LastWriteWins,
173 FirstWriteWins,
175 RegionPriority { priority_order: Vec<String> },
177 Custom { resolver_name: String },
179 Manual,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct ReplicatedEvent {
186 pub event: StreamEvent,
188 pub replication_metadata: ReplicationMetadata,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ReplicationMetadata {
195 pub replication_id: Uuid,
197 pub source_region: String,
199 pub target_regions: Vec<String>,
201 pub replication_timestamp: DateTime<Utc>,
203 pub region_status: HashMap<String, ReplicationStatus>,
205 pub vector_clock: VectorClock,
207 pub conflict_info: Option<ConflictInfo>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub enum ReplicationStatus {
214 Pending,
216 Success { timestamp: DateTime<Utc> },
218 Failed {
220 error: String,
221 timestamp: DateTime<Utc>,
222 },
223 InProgress { started_at: DateTime<Utc> },
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct VectorClock {
230 pub clocks: HashMap<String, u64>,
232}
233
234impl Default for VectorClock {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240impl VectorClock {
241 pub fn new() -> Self {
243 Self {
244 clocks: HashMap::new(),
245 }
246 }
247
248 pub fn increment(&mut self, region: &str) {
250 let current = self.clocks.get(region).unwrap_or(&0);
251 self.clocks.insert(region.to_string(), current + 1);
252 }
253
254 pub fn update(&mut self, other: &VectorClock) {
256 for (region, other_clock) in &other.clocks {
257 let current = self.clocks.get(region).unwrap_or(&0);
258 self.clocks
259 .insert(region.clone(), (*current).max(*other_clock));
260 }
261 }
262
263 pub fn happens_before(&self, other: &VectorClock) -> bool {
265 let mut strictly_less = false;
266
267 for region in self.clocks.keys().chain(other.clocks.keys()) {
268 let self_clock = self.clocks.get(region).unwrap_or(&0);
269 let other_clock = other.clocks.get(region).unwrap_or(&0);
270
271 if self_clock > other_clock {
272 return false; } else if self_clock < other_clock {
274 strictly_less = true;
275 }
276 }
277
278 strictly_less
279 }
280
281 pub fn is_concurrent(&self, other: &VectorClock) -> bool {
283 !self.happens_before(other) && !other.happens_before(self)
284 }
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct ConflictInfo {
290 pub conflict_type: ConflictType,
292 pub conflicting_events: Vec<StreamEvent>,
294 pub resolution_strategy: ConflictResolution,
296 pub resolved_at: Option<DateTime<Utc>>,
298 pub resolution_result: Option<StreamEvent>,
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize)]
304pub enum ConflictType {
305 WriteWrite,
307 WriteRead,
309 Schema,
311 Ordering,
313}
314
315pub struct MultiRegionReplicationManager {
317 config: ReplicationConfig,
319 regions: Arc<RwLock<HashMap<String, RegionConfig>>>,
321 current_region: String,
323 stats: Arc<ReplicationStats>,
325 conflict_queue: Arc<Mutex<VecDeque<ConflictInfo>>>,
327 vector_clock: Arc<Mutex<VectorClock>>,
329 health_monitor: Arc<RegionHealthMonitor>,
331 replication_semaphore: Arc<Semaphore>,
333}
334
335#[derive(Debug, Default)]
337pub struct ReplicationStats {
338 pub total_events_replicated: AtomicU64,
339 pub successful_replications: AtomicU64,
340 pub failed_replications: AtomicU64,
341 pub conflicts_detected: AtomicU64,
342 pub conflicts_resolved: AtomicU64,
343 pub average_replication_latency_ms: AtomicU64,
344 pub cross_region_bandwidth_bytes: AtomicU64,
345 pub region_failures: AtomicU64,
346 pub failover_events: AtomicU64,
347}
348
349pub struct RegionHealthMonitor {
351 health_status: Arc<RwLock<HashMap<String, RegionHealth>>>,
353 check_interval: Duration,
355 stats: Arc<HealthStats>,
357}
358
359#[derive(Debug, Clone)]
361pub struct RegionHealth {
362 pub is_healthy: bool,
364 pub last_success: Option<DateTime<Utc>>,
366 pub last_check: DateTime<Utc>,
368 pub latency_ms: Option<u64>,
370 pub recent_errors: u32,
372 pub health_score: f64,
374}
375
376#[derive(Debug, Default)]
378pub struct HealthStats {
379 pub total_health_checks: AtomicU64,
380 pub failed_health_checks: AtomicU64,
381 pub average_latency_ms: AtomicU64,
382 pub regions_down: AtomicU64,
383}
384
385impl MultiRegionReplicationManager {
386 pub fn new(config: ReplicationConfig, current_region: String) -> Self {
388 let health_monitor = Arc::new(RegionHealthMonitor::new(config.health_check_interval));
389
390 Self {
391 config,
392 current_region,
393 regions: Arc::new(RwLock::new(HashMap::new())),
394 stats: Arc::new(ReplicationStats::default()),
395 conflict_queue: Arc::new(Mutex::new(VecDeque::new())),
396 vector_clock: Arc::new(Mutex::new(VectorClock::new())),
397 health_monitor,
398 replication_semaphore: Arc::new(Semaphore::new(100)), }
400 }
401
402 pub async fn add_region(&self, region_config: RegionConfig) -> Result<()> {
404 let region_id = region_config.region_id.clone();
405 let mut regions = self.regions.write().await;
406 regions.insert(region_id.clone(), region_config);
407
408 self.health_monitor.add_region(region_id.clone()).await;
410
411 info!("Added region {} to replication topology", region_id);
412 Ok(())
413 }
414
415 pub async fn remove_region(&self, region_id: &str) -> Result<()> {
417 let mut regions = self.regions.write().await;
418 if regions.remove(region_id).is_some() {
419 self.health_monitor.remove_region(region_id).await;
420 info!("Removed region {} from replication topology", region_id);
421 Ok(())
422 } else {
423 Err(anyhow!("Region {} not found", region_id))
424 }
425 }
426
427 pub async fn replicate_event(&self, event: StreamEvent) -> Result<ReplicatedEvent> {
429 let _permit = self.replication_semaphore.acquire().await?;
430 let start_time = Instant::now();
431
432 let mut vector_clock = self.vector_clock.lock().await;
434 vector_clock.increment(&self.current_region);
435 let replication_metadata = ReplicationMetadata {
436 replication_id: Uuid::new_v4(),
437 source_region: self.current_region.clone(),
438 target_regions: self.get_target_regions(&event).await?,
439 replication_timestamp: Utc::now(),
440 region_status: HashMap::new(),
441 vector_clock: vector_clock.clone(),
442 conflict_info: None,
443 };
444 drop(vector_clock);
445
446 let replicated_event = ReplicatedEvent {
447 event,
448 replication_metadata,
449 };
450
451 match self.config.strategy {
453 ReplicationStrategy::FullReplication => {
454 self.replicate_to_all_regions(&replicated_event).await?;
455 }
456 ReplicationStrategy::SelectiveReplication { ref event_types } => {
457 if self.should_replicate_event(&replicated_event.event, event_types) {
458 self.replicate_to_all_regions(&replicated_event).await?;
459 }
460 }
461 ReplicationStrategy::PartitionBased {
462 ref partition_strategy,
463 } => {
464 self.replicate_partitioned(&replicated_event, partition_strategy)
465 .await?;
466 }
467 ReplicationStrategy::GeographyBased { ref region_groups } => {
468 self.replicate_by_geography(&replicated_event, region_groups)
469 .await?;
470 }
471 }
472
473 let replication_latency = start_time.elapsed();
475 self.stats
476 .total_events_replicated
477 .fetch_add(1, Ordering::Relaxed);
478 self.stats
479 .average_replication_latency_ms
480 .store(replication_latency.as_millis() as u64, Ordering::Relaxed);
481
482 info!(
483 "Replicated event {} to {} regions in {:?}",
484 replicated_event.replication_metadata.replication_id,
485 replicated_event.replication_metadata.target_regions.len(),
486 replication_latency
487 );
488
489 Ok(replicated_event)
490 }
491
492 pub async fn handle_replicated_event(&self, replicated_event: ReplicatedEvent) -> Result<()> {
494 if let Some(conflict) = self.detect_conflict(&replicated_event).await? {
496 self.handle_conflict(conflict).await?;
497 return Ok(());
498 }
499
500 let mut vector_clock = self.vector_clock.lock().await;
502 vector_clock.update(&replicated_event.replication_metadata.vector_clock);
503 drop(vector_clock);
504
505 self.process_replicated_event(replicated_event).await?;
507
508 Ok(())
509 }
510
511 async fn detect_conflict(
513 &self,
514 replicated_event: &ReplicatedEvent,
515 ) -> Result<Option<ConflictInfo>> {
516 let vector_clock = self.vector_clock.lock().await;
518
519 if vector_clock.is_concurrent(&replicated_event.replication_metadata.vector_clock) {
520 self.stats
522 .conflicts_detected
523 .fetch_add(1, Ordering::Relaxed);
524
525 let conflict_info = ConflictInfo {
526 conflict_type: ConflictType::WriteWrite,
527 conflicting_events: vec![replicated_event.event.clone()],
528 resolution_strategy: self.config.conflict_resolution.clone(),
529 resolved_at: None,
530 resolution_result: None,
531 };
532
533 warn!(
534 "Conflict detected for event {}",
535 replicated_event.replication_metadata.replication_id
536 );
537 return Ok(Some(conflict_info));
538 }
539
540 Ok(None)
541 }
542
543 async fn handle_conflict(&self, mut conflict_info: ConflictInfo) -> Result<()> {
545 match &self.config.conflict_resolution {
546 ConflictResolution::LastWriteWins => {
547 conflict_info.resolution_result = Some(
549 conflict_info
550 .conflicting_events
551 .iter()
552 .max_by_key(|e| e.metadata().timestamp)
553 .expect("conflicting_events should not be empty")
554 .clone(),
555 );
556 conflict_info.resolved_at = Some(Utc::now());
557 self.stats
558 .conflicts_resolved
559 .fetch_add(1, Ordering::Relaxed);
560 }
561 ConflictResolution::Manual => {
562 let mut queue = self.conflict_queue.lock().await;
564 queue.push_back(conflict_info);
565 }
566 _ => {
567 warn!(
568 "Conflict resolution strategy not implemented: {:?}",
569 self.config.conflict_resolution
570 );
571 }
572 }
573
574 Ok(())
575 }
576
577 async fn get_target_regions(&self, _event: &StreamEvent) -> Result<Vec<String>> {
579 let regions = self.regions.read().await;
580 let healthy_regions = self.health_monitor.get_healthy_regions().await;
581
582 Ok(regions
583 .keys()
584 .filter(|region_id| {
585 **region_id != self.current_region && healthy_regions.contains(*region_id)
586 })
587 .cloned()
588 .collect())
589 }
590
591 async fn replicate_to_all_regions(&self, replicated_event: &ReplicatedEvent) -> Result<()> {
593 let regions = self.regions.read().await;
594 let mut replication_tasks = Vec::new();
595
596 for region_id in &replicated_event.replication_metadata.target_regions {
597 if let Some(region_config) = regions.get(region_id) {
598 let event_clone = replicated_event.clone();
599 let region_config_clone = region_config.clone();
600 let region_id_clone = region_id.clone();
601 let stats = self.stats.clone();
602
603 let task = tokio::spawn(async move {
604 match Self::send_to_region(event_clone, region_config_clone).await {
605 Ok(_) => {
606 stats
607 .successful_replications
608 .fetch_add(1, Ordering::Relaxed);
609 }
610 Err(e) => {
611 stats.failed_replications.fetch_add(1, Ordering::Relaxed);
612 error!("Failed to replicate to region {}: {}", region_id_clone, e);
613 }
614 }
615 });
616
617 replication_tasks.push(task);
618 }
619 }
620
621 for task in replication_tasks {
624 let _ = task.await;
625 }
626
627 Ok(())
628 }
629
630 async fn send_to_region(
632 _replicated_event: ReplicatedEvent,
633 _region_config: RegionConfig,
634 ) -> Result<()> {
635 time::sleep(Duration::from_millis(50)).await;
637
638 if fastrand::f32() < 0.05 {
640 return Err(anyhow!("Simulated network failure"));
642 }
643
644 Ok(())
645 }
646
647 fn should_replicate_event(&self, event: &StreamEvent, event_types: &HashSet<String>) -> bool {
649 let event_type = format!("{:?}", std::mem::discriminant(event));
650 event_types.contains(&event_type)
651 }
652
653 async fn replicate_partitioned(
655 &self,
656 _replicated_event: &ReplicatedEvent,
657 _partition_strategy: &PartitionStrategy,
658 ) -> Result<()> {
659 Ok(())
662 }
663
664 async fn replicate_by_geography(
666 &self,
667 _replicated_event: &ReplicatedEvent,
668 _region_groups: &HashMap<String, Vec<String>>,
669 ) -> Result<()> {
670 Ok(())
673 }
674
675 async fn process_replicated_event(&self, _replicated_event: ReplicatedEvent) -> Result<()> {
677 Ok(())
680 }
681
682 pub fn get_stats(&self) -> ReplicationStats {
684 ReplicationStats {
685 total_events_replicated: AtomicU64::new(
686 self.stats.total_events_replicated.load(Ordering::Relaxed),
687 ),
688 successful_replications: AtomicU64::new(
689 self.stats.successful_replications.load(Ordering::Relaxed),
690 ),
691 failed_replications: AtomicU64::new(
692 self.stats.failed_replications.load(Ordering::Relaxed),
693 ),
694 conflicts_detected: AtomicU64::new(
695 self.stats.conflicts_detected.load(Ordering::Relaxed),
696 ),
697 conflicts_resolved: AtomicU64::new(
698 self.stats.conflicts_resolved.load(Ordering::Relaxed),
699 ),
700 average_replication_latency_ms: AtomicU64::new(
701 self.stats
702 .average_replication_latency_ms
703 .load(Ordering::Relaxed),
704 ),
705 cross_region_bandwidth_bytes: AtomicU64::new(
706 self.stats
707 .cross_region_bandwidth_bytes
708 .load(Ordering::Relaxed),
709 ),
710 region_failures: AtomicU64::new(self.stats.region_failures.load(Ordering::Relaxed)),
711 failover_events: AtomicU64::new(self.stats.failover_events.load(Ordering::Relaxed)),
712 }
713 }
714
715 pub async fn get_pending_conflicts(&self) -> Vec<ConflictInfo> {
717 let queue = self.conflict_queue.lock().await;
718 queue.iter().cloned().collect()
719 }
720}
721
722impl RegionHealthMonitor {
723 pub fn new(check_interval: Duration) -> Self {
725 Self {
726 health_status: Arc::new(RwLock::new(HashMap::new())),
727 check_interval,
728 stats: Arc::new(HealthStats::default()),
729 }
730 }
731
732 pub async fn add_region(&self, region_id: String) {
734 let mut health_status = self.health_status.write().await;
735 health_status.insert(
736 region_id,
737 RegionHealth {
738 is_healthy: true,
739 last_success: None,
740 last_check: Utc::now(),
741 latency_ms: None,
742 recent_errors: 0,
743 health_score: 1.0,
744 },
745 );
746 }
747
748 pub async fn remove_region(&self, region_id: &str) {
750 let mut health_status = self.health_status.write().await;
751 health_status.remove(region_id);
752 }
753
754 pub async fn get_healthy_regions(&self) -> Vec<String> {
756 let health_status = self.health_status.read().await;
757 health_status
758 .iter()
759 .filter(|(_, health)| health.is_healthy)
760 .map(|(region_id, _)| region_id.clone())
761 .collect()
762 }
763
764 pub async fn check_all_regions(&self) -> Result<()> {
766 let regions: Vec<String> = {
767 let health_status = self.health_status.read().await;
768 health_status.keys().cloned().collect()
769 };
770
771 for region_id in regions {
772 self.check_region_health(®ion_id).await?;
773 }
774
775 Ok(())
776 }
777
778 async fn check_region_health(&self, region_id: &str) -> Result<()> {
780 let start_time = Instant::now();
781 self.stats
782 .total_health_checks
783 .fetch_add(1, Ordering::Relaxed);
784
785 let is_healthy = fastrand::f32() > 0.1; let latency = start_time.elapsed();
788
789 let mut health_status = self.health_status.write().await;
790 if let Some(health) = health_status.get_mut(region_id) {
791 health.last_check = Utc::now();
792 health.latency_ms = Some(latency.as_millis() as u64);
793
794 if is_healthy {
795 health.is_healthy = true;
796 health.last_success = Some(Utc::now());
797 health.recent_errors = 0;
798 health.health_score = (health.health_score + 0.1).min(1.0);
799 } else {
800 health.recent_errors += 1;
801 health.health_score = (health.health_score - 0.2).max(0.0);
802
803 if health.recent_errors > 3 {
804 health.is_healthy = false;
805 self.stats
806 .failed_health_checks
807 .fetch_add(1, Ordering::Relaxed);
808 }
809 }
810 }
811
812 Ok(())
813 }
814}
815
816#[cfg(test)]
817mod tests {
818 use super::*;
819 use crate::event::EventMetadata;
820 use std::collections::HashMap;
821
822 fn create_test_region(id: &str) -> RegionConfig {
823 RegionConfig {
824 region_id: id.to_string(),
825 region_name: format!("Region {id}"),
826 location: GeographicLocation {
827 country: "US".to_string(),
828 region: "California".to_string(),
829 city: "San Francisco".to_string(),
830 latitude: 37.7749,
831 longitude: -122.4194,
832 availability_zone: Some("us-west-1a".to_string()),
833 },
834 endpoints: vec![RegionEndpoint {
835 url: format!("https://{id}.example.com"),
836 endpoint_type: EndpointType::Primary,
837 is_healthy: true,
838 last_health_check: Some(Utc::now()),
839 auth: None,
840 }],
841 priority: 1,
842 is_write_active: true,
843 is_read_active: true,
844 replication_mode: ReplicationMode::Asynchronous,
845 latency_map: HashMap::new(),
846 }
847 }
848
849 fn create_test_event() -> StreamEvent {
850 StreamEvent::TripleAdded {
851 subject: "http://test.org/subject".to_string(),
852 predicate: "http://test.org/predicate".to_string(),
853 object: "\"test_value\"".to_string(),
854 graph: None,
855 metadata: EventMetadata {
856 event_id: Uuid::new_v4().to_string(),
857 timestamp: Utc::now(),
858 source: "test".to_string(),
859 user: None,
860 context: None,
861 caused_by: None,
862 version: "1.0".to_string(),
863 properties: HashMap::new(),
864 checksum: None,
865 },
866 }
867 }
868
869 #[tokio::test]
870 async fn test_replication_manager_creation() {
871 let config = ReplicationConfig {
872 strategy: ReplicationStrategy::FullReplication,
873 conflict_resolution: ConflictResolution::LastWriteWins,
874 max_lag_ms: 1000,
875 replication_timeout: Duration::from_secs(30),
876 enable_compression: true,
877 batch_size: 100,
878 health_check_interval: Duration::from_secs(60),
879 failover_timeout: Duration::from_secs(300),
880 };
881
882 let manager = MultiRegionReplicationManager::new(config, "us-west-1".to_string());
883 assert_eq!(manager.current_region, "us-west-1");
884 }
885
886 #[tokio::test]
887 async fn test_region_management() {
888 let config = ReplicationConfig {
889 strategy: ReplicationStrategy::FullReplication,
890 conflict_resolution: ConflictResolution::LastWriteWins,
891 max_lag_ms: 1000,
892 replication_timeout: Duration::from_secs(30),
893 enable_compression: true,
894 batch_size: 100,
895 health_check_interval: Duration::from_secs(60),
896 failover_timeout: Duration::from_secs(300),
897 };
898
899 let manager = MultiRegionReplicationManager::new(config, "us-west-1".to_string());
900
901 manager
903 .add_region(create_test_region("us-east-1"))
904 .await
905 .unwrap();
906 manager
907 .add_region(create_test_region("eu-west-1"))
908 .await
909 .unwrap();
910
911 let regions = manager.regions.read().await;
912 assert_eq!(regions.len(), 2);
913 assert!(regions.contains_key("us-east-1"));
914 assert!(regions.contains_key("eu-west-1"));
915 }
916
917 #[tokio::test]
918 async fn test_vector_clock() {
919 let mut clock1 = VectorClock::new();
920 let mut clock2 = VectorClock::new();
921
922 clock1.increment("region1");
923 clock2.increment("region2");
924
925 assert!(clock1.is_concurrent(&clock2));
926 assert!(!clock1.happens_before(&clock2));
927
928 clock1.update(&clock2);
929 clock1.increment("region1");
930
931 assert!(clock2.happens_before(&clock1));
932 assert!(!clock1.happens_before(&clock2));
933 }
934
935 #[tokio::test]
936 async fn test_health_monitor() {
937 let monitor = RegionHealthMonitor::new(Duration::from_secs(60));
938
939 monitor.add_region("us-west-1".to_string()).await;
940 monitor.add_region("us-east-1".to_string()).await;
941
942 let healthy_regions = monitor.get_healthy_regions().await;
943 assert_eq!(healthy_regions.len(), 2);
944
945 monitor.check_all_regions().await.unwrap();
946
947 let stats = &monitor.stats;
948 assert!(stats.total_health_checks.load(Ordering::Relaxed) >= 2);
949 }
950
951 #[test]
952 fn test_replication_config() {
953 let config = ReplicationConfig {
954 strategy: ReplicationStrategy::SelectiveReplication {
955 event_types: ["TripleAdded", "TripleRemoved"]
956 .iter()
957 .map(|s| s.to_string())
958 .collect(),
959 },
960 conflict_resolution: ConflictResolution::RegionPriority {
961 priority_order: vec!["us-west-1".to_string(), "us-east-1".to_string()],
962 },
963 max_lag_ms: 500,
964 replication_timeout: Duration::from_secs(15),
965 enable_compression: false,
966 batch_size: 50,
967 health_check_interval: Duration::from_secs(30),
968 failover_timeout: Duration::from_secs(120),
969 };
970
971 match config.strategy {
972 ReplicationStrategy::SelectiveReplication { ref event_types } => {
973 assert_eq!(event_types.len(), 2);
974 }
975 _ => panic!("Wrong strategy type"),
976 }
977 }
978}