use crate::StreamEvent;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, RwLock, Semaphore};
use tokio::time;
use tracing::{error, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionConfig {
pub region_id: String,
pub region_name: String,
pub location: GeographicLocation,
pub endpoints: Vec<RegionEndpoint>,
pub priority: u8,
pub is_write_active: bool,
pub is_read_active: bool,
pub replication_mode: ReplicationMode,
pub latency_map: HashMap<String, u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GeographicLocation {
pub country: String,
pub region: String,
pub city: String,
pub latitude: f64,
pub longitude: f64,
pub availability_zone: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionEndpoint {
pub url: String,
pub endpoint_type: EndpointType,
pub is_healthy: bool,
pub last_health_check: Option<DateTime<Utc>>,
pub auth: Option<EndpointAuth>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EndpointType {
Primary,
Secondary,
Admin,
HealthCheck,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EndpointAuth {
pub auth_type: String,
pub credentials: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplicationMode {
Synchronous,
Asynchronous,
SemiSynchronous { min_replicas: usize },
LeaderFollower { leader_region: String },
ActiveActive,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationConfig {
pub strategy: ReplicationStrategy,
pub conflict_resolution: ConflictResolution,
pub max_lag_ms: u64,
pub replication_timeout: Duration,
pub enable_compression: bool,
pub batch_size: usize,
pub health_check_interval: Duration,
pub failover_timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplicationStrategy {
FullReplication,
SelectiveReplication { event_types: HashSet<String> },
PartitionBased {
partition_strategy: PartitionStrategy,
},
GeographyBased {
region_groups: HashMap<String, Vec<String>>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PartitionStrategy {
Hash { hash_key: String },
Range { ranges: Vec<PartitionRange> },
Custom { strategy_name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionRange {
pub start: String,
pub end: String,
pub regions: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConflictResolution {
LastWriteWins,
FirstWriteWins,
RegionPriority { priority_order: Vec<String> },
Custom { resolver_name: String },
Manual,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicatedEvent {
pub event: StreamEvent,
pub replication_metadata: ReplicationMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationMetadata {
pub replication_id: Uuid,
pub source_region: String,
pub target_regions: Vec<String>,
pub replication_timestamp: DateTime<Utc>,
pub region_status: HashMap<String, ReplicationStatus>,
pub vector_clock: VectorClock,
pub conflict_info: Option<ConflictInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplicationStatus {
Pending,
Success { timestamp: DateTime<Utc> },
Failed {
error: String,
timestamp: DateTime<Utc>,
},
InProgress { started_at: DateTime<Utc> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VectorClock {
pub clocks: HashMap<String, u64>,
}
impl Default for VectorClock {
fn default() -> Self {
Self::new()
}
}
impl VectorClock {
pub fn new() -> Self {
Self {
clocks: HashMap::new(),
}
}
pub fn increment(&mut self, region: &str) {
let current = self.clocks.get(region).unwrap_or(&0);
self.clocks.insert(region.to_string(), current + 1);
}
pub fn update(&mut self, other: &VectorClock) {
for (region, other_clock) in &other.clocks {
let current = self.clocks.get(region).unwrap_or(&0);
self.clocks
.insert(region.clone(), (*current).max(*other_clock));
}
}
pub fn happens_before(&self, other: &VectorClock) -> bool {
let mut strictly_less = false;
for region in self.clocks.keys().chain(other.clocks.keys()) {
let self_clock = self.clocks.get(region).unwrap_or(&0);
let other_clock = other.clocks.get(region).unwrap_or(&0);
if self_clock > other_clock {
return false; } else if self_clock < other_clock {
strictly_less = true;
}
}
strictly_less
}
pub fn is_concurrent(&self, other: &VectorClock) -> bool {
!self.happens_before(other) && !other.happens_before(self)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConflictInfo {
pub conflict_type: ConflictType,
pub conflicting_events: Vec<StreamEvent>,
pub resolution_strategy: ConflictResolution,
pub resolved_at: Option<DateTime<Utc>>,
pub resolution_result: Option<StreamEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConflictType {
WriteWrite,
WriteRead,
Schema,
Ordering,
}
pub struct MultiRegionReplicationManager {
config: ReplicationConfig,
regions: Arc<RwLock<HashMap<String, RegionConfig>>>,
current_region: String,
stats: Arc<ReplicationStats>,
conflict_queue: Arc<Mutex<VecDeque<ConflictInfo>>>,
vector_clock: Arc<Mutex<VectorClock>>,
health_monitor: Arc<RegionHealthMonitor>,
replication_semaphore: Arc<Semaphore>,
}
#[derive(Debug, Default)]
pub struct ReplicationStats {
pub total_events_replicated: AtomicU64,
pub successful_replications: AtomicU64,
pub failed_replications: AtomicU64,
pub conflicts_detected: AtomicU64,
pub conflicts_resolved: AtomicU64,
pub average_replication_latency_ms: AtomicU64,
pub cross_region_bandwidth_bytes: AtomicU64,
pub region_failures: AtomicU64,
pub failover_events: AtomicU64,
}
pub struct RegionHealthMonitor {
health_status: Arc<RwLock<HashMap<String, RegionHealth>>>,
check_interval: Duration,
stats: Arc<HealthStats>,
}
#[derive(Debug, Clone)]
pub struct RegionHealth {
pub is_healthy: bool,
pub last_success: Option<DateTime<Utc>>,
pub last_check: DateTime<Utc>,
pub latency_ms: Option<u64>,
pub recent_errors: u32,
pub health_score: f64,
}
#[derive(Debug, Default)]
pub struct HealthStats {
pub total_health_checks: AtomicU64,
pub failed_health_checks: AtomicU64,
pub average_latency_ms: AtomicU64,
pub regions_down: AtomicU64,
}
impl MultiRegionReplicationManager {
pub fn new(config: ReplicationConfig, current_region: String) -> Self {
let health_monitor = Arc::new(RegionHealthMonitor::new(config.health_check_interval));
Self {
config,
current_region,
regions: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(ReplicationStats::default()),
conflict_queue: Arc::new(Mutex::new(VecDeque::new())),
vector_clock: Arc::new(Mutex::new(VectorClock::new())),
health_monitor,
replication_semaphore: Arc::new(Semaphore::new(100)), }
}
pub async fn add_region(&self, region_config: RegionConfig) -> Result<()> {
let region_id = region_config.region_id.clone();
let mut regions = self.regions.write().await;
regions.insert(region_id.clone(), region_config);
self.health_monitor.add_region(region_id.clone()).await;
info!("Added region {} to replication topology", region_id);
Ok(())
}
pub async fn remove_region(&self, region_id: &str) -> Result<()> {
let mut regions = self.regions.write().await;
if regions.remove(region_id).is_some() {
self.health_monitor.remove_region(region_id).await;
info!("Removed region {} from replication topology", region_id);
Ok(())
} else {
Err(anyhow!("Region {} not found", region_id))
}
}
pub async fn replicate_event(&self, event: StreamEvent) -> Result<ReplicatedEvent> {
let _permit = self.replication_semaphore.acquire().await?;
let start_time = Instant::now();
let mut vector_clock = self.vector_clock.lock().await;
vector_clock.increment(&self.current_region);
let replication_metadata = ReplicationMetadata {
replication_id: Uuid::new_v4(),
source_region: self.current_region.clone(),
target_regions: self.get_target_regions(&event).await?,
replication_timestamp: Utc::now(),
region_status: HashMap::new(),
vector_clock: vector_clock.clone(),
conflict_info: None,
};
drop(vector_clock);
let replicated_event = ReplicatedEvent {
event,
replication_metadata,
};
match self.config.strategy {
ReplicationStrategy::FullReplication => {
self.replicate_to_all_regions(&replicated_event).await?;
}
ReplicationStrategy::SelectiveReplication { ref event_types } => {
if self.should_replicate_event(&replicated_event.event, event_types) {
self.replicate_to_all_regions(&replicated_event).await?;
}
}
ReplicationStrategy::PartitionBased {
ref partition_strategy,
} => {
self.replicate_partitioned(&replicated_event, partition_strategy)
.await?;
}
ReplicationStrategy::GeographyBased { ref region_groups } => {
self.replicate_by_geography(&replicated_event, region_groups)
.await?;
}
}
let replication_latency = start_time.elapsed();
self.stats
.total_events_replicated
.fetch_add(1, Ordering::Relaxed);
self.stats
.average_replication_latency_ms
.store(replication_latency.as_millis() as u64, Ordering::Relaxed);
info!(
"Replicated event {} to {} regions in {:?}",
replicated_event.replication_metadata.replication_id,
replicated_event.replication_metadata.target_regions.len(),
replication_latency
);
Ok(replicated_event)
}
pub async fn handle_replicated_event(&self, replicated_event: ReplicatedEvent) -> Result<()> {
if let Some(conflict) = self.detect_conflict(&replicated_event).await? {
self.handle_conflict(conflict).await?;
return Ok(());
}
let mut vector_clock = self.vector_clock.lock().await;
vector_clock.update(&replicated_event.replication_metadata.vector_clock);
drop(vector_clock);
self.process_replicated_event(replicated_event).await?;
Ok(())
}
async fn detect_conflict(
&self,
replicated_event: &ReplicatedEvent,
) -> Result<Option<ConflictInfo>> {
let vector_clock = self.vector_clock.lock().await;
if vector_clock.is_concurrent(&replicated_event.replication_metadata.vector_clock) {
self.stats
.conflicts_detected
.fetch_add(1, Ordering::Relaxed);
let conflict_info = ConflictInfo {
conflict_type: ConflictType::WriteWrite,
conflicting_events: vec![replicated_event.event.clone()],
resolution_strategy: self.config.conflict_resolution.clone(),
resolved_at: None,
resolution_result: None,
};
warn!(
"Conflict detected for event {}",
replicated_event.replication_metadata.replication_id
);
return Ok(Some(conflict_info));
}
Ok(None)
}
async fn handle_conflict(&self, mut conflict_info: ConflictInfo) -> Result<()> {
match &self.config.conflict_resolution {
ConflictResolution::LastWriteWins => {
conflict_info.resolution_result = Some(
conflict_info
.conflicting_events
.iter()
.max_by_key(|e| e.metadata().timestamp)
.expect("conflicting_events should not be empty")
.clone(),
);
conflict_info.resolved_at = Some(Utc::now());
self.stats
.conflicts_resolved
.fetch_add(1, Ordering::Relaxed);
}
ConflictResolution::Manual => {
let mut queue = self.conflict_queue.lock().await;
queue.push_back(conflict_info);
}
_ => {
warn!(
"Conflict resolution strategy not implemented: {:?}",
self.config.conflict_resolution
);
}
}
Ok(())
}
async fn get_target_regions(&self, _event: &StreamEvent) -> Result<Vec<String>> {
let regions = self.regions.read().await;
let healthy_regions = self.health_monitor.get_healthy_regions().await;
Ok(regions
.keys()
.filter(|region_id| {
**region_id != self.current_region && healthy_regions.contains(*region_id)
})
.cloned()
.collect())
}
async fn replicate_to_all_regions(&self, replicated_event: &ReplicatedEvent) -> Result<()> {
let regions = self.regions.read().await;
let mut replication_tasks = Vec::new();
for region_id in &replicated_event.replication_metadata.target_regions {
if let Some(region_config) = regions.get(region_id) {
let event_clone = replicated_event.clone();
let region_config_clone = region_config.clone();
let region_id_clone = region_id.clone();
let stats = self.stats.clone();
let task = tokio::spawn(async move {
match Self::send_to_region(event_clone, region_config_clone).await {
Ok(_) => {
stats
.successful_replications
.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
stats.failed_replications.fetch_add(1, Ordering::Relaxed);
error!("Failed to replicate to region {}: {}", region_id_clone, e);
}
}
});
replication_tasks.push(task);
}
}
for task in replication_tasks {
let _ = task.await;
}
Ok(())
}
async fn send_to_region(
_replicated_event: ReplicatedEvent,
_region_config: RegionConfig,
) -> Result<()> {
time::sleep(Duration::from_millis(50)).await;
if fastrand::f32() < 0.05 {
return Err(anyhow!("Simulated network failure"));
}
Ok(())
}
fn should_replicate_event(&self, event: &StreamEvent, event_types: &HashSet<String>) -> bool {
let event_type = format!("{:?}", std::mem::discriminant(event));
event_types.contains(&event_type)
}
async fn replicate_partitioned(
&self,
_replicated_event: &ReplicatedEvent,
_partition_strategy: &PartitionStrategy,
) -> Result<()> {
Ok(())
}
async fn replicate_by_geography(
&self,
_replicated_event: &ReplicatedEvent,
_region_groups: &HashMap<String, Vec<String>>,
) -> Result<()> {
Ok(())
}
async fn process_replicated_event(&self, _replicated_event: ReplicatedEvent) -> Result<()> {
Ok(())
}
pub fn get_stats(&self) -> ReplicationStats {
ReplicationStats {
total_events_replicated: AtomicU64::new(
self.stats.total_events_replicated.load(Ordering::Relaxed),
),
successful_replications: AtomicU64::new(
self.stats.successful_replications.load(Ordering::Relaxed),
),
failed_replications: AtomicU64::new(
self.stats.failed_replications.load(Ordering::Relaxed),
),
conflicts_detected: AtomicU64::new(
self.stats.conflicts_detected.load(Ordering::Relaxed),
),
conflicts_resolved: AtomicU64::new(
self.stats.conflicts_resolved.load(Ordering::Relaxed),
),
average_replication_latency_ms: AtomicU64::new(
self.stats
.average_replication_latency_ms
.load(Ordering::Relaxed),
),
cross_region_bandwidth_bytes: AtomicU64::new(
self.stats
.cross_region_bandwidth_bytes
.load(Ordering::Relaxed),
),
region_failures: AtomicU64::new(self.stats.region_failures.load(Ordering::Relaxed)),
failover_events: AtomicU64::new(self.stats.failover_events.load(Ordering::Relaxed)),
}
}
pub async fn get_pending_conflicts(&self) -> Vec<ConflictInfo> {
let queue = self.conflict_queue.lock().await;
queue.iter().cloned().collect()
}
}
impl RegionHealthMonitor {
pub fn new(check_interval: Duration) -> Self {
Self {
health_status: Arc::new(RwLock::new(HashMap::new())),
check_interval,
stats: Arc::new(HealthStats::default()),
}
}
pub async fn add_region(&self, region_id: String) {
let mut health_status = self.health_status.write().await;
health_status.insert(
region_id,
RegionHealth {
is_healthy: true,
last_success: None,
last_check: Utc::now(),
latency_ms: None,
recent_errors: 0,
health_score: 1.0,
},
);
}
pub async fn remove_region(&self, region_id: &str) {
let mut health_status = self.health_status.write().await;
health_status.remove(region_id);
}
pub async fn get_healthy_regions(&self) -> Vec<String> {
let health_status = self.health_status.read().await;
health_status
.iter()
.filter(|(_, health)| health.is_healthy)
.map(|(region_id, _)| region_id.clone())
.collect()
}
pub async fn check_all_regions(&self) -> Result<()> {
let regions: Vec<String> = {
let health_status = self.health_status.read().await;
health_status.keys().cloned().collect()
};
for region_id in regions {
self.check_region_health(®ion_id).await?;
}
Ok(())
}
async fn check_region_health(&self, region_id: &str) -> Result<()> {
let start_time = Instant::now();
self.stats
.total_health_checks
.fetch_add(1, Ordering::Relaxed);
let is_healthy = fastrand::f32() > 0.1; let latency = start_time.elapsed();
let mut health_status = self.health_status.write().await;
if let Some(health) = health_status.get_mut(region_id) {
health.last_check = Utc::now();
health.latency_ms = Some(latency.as_millis() as u64);
if is_healthy {
health.is_healthy = true;
health.last_success = Some(Utc::now());
health.recent_errors = 0;
health.health_score = (health.health_score + 0.1).min(1.0);
} else {
health.recent_errors += 1;
health.health_score = (health.health_score - 0.2).max(0.0);
if health.recent_errors > 3 {
health.is_healthy = false;
self.stats
.failed_health_checks
.fetch_add(1, Ordering::Relaxed);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
use std::collections::HashMap;
fn create_test_region(id: &str) -> RegionConfig {
RegionConfig {
region_id: id.to_string(),
region_name: format!("Region {id}"),
location: GeographicLocation {
country: "US".to_string(),
region: "California".to_string(),
city: "San Francisco".to_string(),
latitude: 37.7749,
longitude: -122.4194,
availability_zone: Some("us-west-1a".to_string()),
},
endpoints: vec![RegionEndpoint {
url: format!("https://{id}.example.com"),
endpoint_type: EndpointType::Primary,
is_healthy: true,
last_health_check: Some(Utc::now()),
auth: None,
}],
priority: 1,
is_write_active: true,
is_read_active: true,
replication_mode: ReplicationMode::Asynchronous,
latency_map: HashMap::new(),
}
}
fn create_test_event() -> StreamEvent {
StreamEvent::TripleAdded {
subject: "http://test.org/subject".to_string(),
predicate: "http://test.org/predicate".to_string(),
object: "\"test_value\"".to_string(),
graph: None,
metadata: EventMetadata {
event_id: Uuid::new_v4().to_string(),
timestamp: Utc::now(),
source: "test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
}
}
#[tokio::test]
async fn test_replication_manager_creation() {
let config = ReplicationConfig {
strategy: ReplicationStrategy::FullReplication,
conflict_resolution: ConflictResolution::LastWriteWins,
max_lag_ms: 1000,
replication_timeout: Duration::from_secs(30),
enable_compression: true,
batch_size: 100,
health_check_interval: Duration::from_secs(60),
failover_timeout: Duration::from_secs(300),
};
let manager = MultiRegionReplicationManager::new(config, "us-west-1".to_string());
assert_eq!(manager.current_region, "us-west-1");
}
#[tokio::test]
async fn test_region_management() {
let config = ReplicationConfig {
strategy: ReplicationStrategy::FullReplication,
conflict_resolution: ConflictResolution::LastWriteWins,
max_lag_ms: 1000,
replication_timeout: Duration::from_secs(30),
enable_compression: true,
batch_size: 100,
health_check_interval: Duration::from_secs(60),
failover_timeout: Duration::from_secs(300),
};
let manager = MultiRegionReplicationManager::new(config, "us-west-1".to_string());
manager
.add_region(create_test_region("us-east-1"))
.await
.unwrap();
manager
.add_region(create_test_region("eu-west-1"))
.await
.unwrap();
let regions = manager.regions.read().await;
assert_eq!(regions.len(), 2);
assert!(regions.contains_key("us-east-1"));
assert!(regions.contains_key("eu-west-1"));
}
#[tokio::test]
async fn test_vector_clock() {
let mut clock1 = VectorClock::new();
let mut clock2 = VectorClock::new();
clock1.increment("region1");
clock2.increment("region2");
assert!(clock1.is_concurrent(&clock2));
assert!(!clock1.happens_before(&clock2));
clock1.update(&clock2);
clock1.increment("region1");
assert!(clock2.happens_before(&clock1));
assert!(!clock1.happens_before(&clock2));
}
#[tokio::test]
async fn test_health_monitor() {
let monitor = RegionHealthMonitor::new(Duration::from_secs(60));
monitor.add_region("us-west-1".to_string()).await;
monitor.add_region("us-east-1".to_string()).await;
let healthy_regions = monitor.get_healthy_regions().await;
assert_eq!(healthy_regions.len(), 2);
monitor.check_all_regions().await.unwrap();
let stats = &monitor.stats;
assert!(stats.total_health_checks.load(Ordering::Relaxed) >= 2);
}
#[test]
fn test_replication_config() {
let config = ReplicationConfig {
strategy: ReplicationStrategy::SelectiveReplication {
event_types: ["TripleAdded", "TripleRemoved"]
.iter()
.map(|s| s.to_string())
.collect(),
},
conflict_resolution: ConflictResolution::RegionPriority {
priority_order: vec!["us-west-1".to_string(), "us-east-1".to_string()],
},
max_lag_ms: 500,
replication_timeout: Duration::from_secs(15),
enable_compression: false,
batch_size: 50,
health_check_interval: Duration::from_secs(30),
failover_timeout: Duration::from_secs(120),
};
match config.strategy {
ReplicationStrategy::SelectiveReplication { ref event_types } => {
assert_eq!(event_types.len(), 2);
}
_ => panic!("Wrong strategy type"),
}
}
}