use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use torsh_core::error::TorshError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConsistencyLevel {
Eventual,
Quorum,
Strong,
Causal,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReplicationStrategy {
Synchronous,
Asynchronous,
SemiSynchronous,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConflictResolution {
LastWriteWins,
FirstWriteWins,
Custom,
Manual,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeStatus {
Healthy,
Degraded,
Unhealthy,
Maintenance,
Offline,
}
impl Default for NodeStatus {
fn default() -> Self {
Self::Healthy
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationNode {
pub id: String,
pub region: String,
pub endpoint: String,
pub priority: u32,
pub capacity: u64,
#[serde(skip)]
pub status: NodeStatus,
#[serde(skip)]
pub last_health_check: Option<DateTime<Utc>>,
#[serde(skip)]
pub replication_lag_secs: f64,
}
impl ReplicationNode {
pub fn new(id: String, region: String, endpoint: String, priority: u32, capacity: u64) -> Self {
Self {
id,
region,
endpoint,
priority,
capacity,
status: NodeStatus::Healthy,
last_health_check: None,
replication_lag_secs: 0.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationConfig {
pub consistency: ConsistencyLevel,
pub replication_factor: usize,
pub auto_failover: bool,
pub sync_interval_secs: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicaMetadata {
pub package_id: String,
pub version: String,
pub node_id: String,
pub replica_version: u64,
pub checksum: String,
pub last_sync: DateTime<Utc>,
pub size_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationOperation {
pub id: String,
pub package_id: String,
pub version: String,
pub operation_type: String,
pub source_node: String,
pub target_nodes: Vec<String>,
pub timestamp: DateTime<Utc>,
pub status: OperationStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum OperationStatus {
Pending,
InProgress,
Completed,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationConflict {
pub id: String,
pub package_id: String,
pub version: String,
pub conflicting_replicas: Vec<ReplicaMetadata>,
pub detected_at: DateTime<Utc>,
pub resolved: bool,
pub resolution_strategy: Option<ConflictResolution>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReplicationStatistics {
pub total_nodes: usize,
pub healthy_nodes: usize,
pub total_replicas: usize,
pub total_operations: u64,
pub successful_operations: u64,
pub failed_operations: u64,
pub active_conflicts: usize,
pub avg_replication_lag_secs: f64,
pub total_bandwidth_bytes: u64,
}
pub struct ReplicationManager {
config: ReplicationConfig,
nodes: HashMap<String, ReplicationNode>,
replicas: HashMap<String, Vec<ReplicaMetadata>>,
operations: VecDeque<ReplicationOperation>,
conflicts: Vec<ReplicationConflict>,
statistics: ReplicationStatistics,
}
impl ReplicationManager {
pub fn new(config: ReplicationConfig) -> Self {
Self {
config,
nodes: HashMap::new(),
replicas: HashMap::new(),
operations: VecDeque::new(),
conflicts: Vec::new(),
statistics: ReplicationStatistics::default(),
}
}
pub fn add_node(&mut self, node: ReplicationNode) -> Result<(), TorshError> {
if self.nodes.contains_key(&node.id) {
return Err(TorshError::InvalidArgument(format!(
"Node {} already exists",
node.id
)));
}
self.nodes.insert(node.id.clone(), node);
self.update_statistics();
Ok(())
}
pub fn remove_node(&mut self, node_id: &str) -> Result<(), TorshError> {
self.nodes
.remove(node_id)
.ok_or_else(|| TorshError::InvalidArgument(format!("Node {} not found", node_id)))?;
self.redistribute_replicas(node_id)?;
self.update_statistics();
Ok(())
}
pub fn replicate_package(
&mut self,
package_id: &str,
version: &str,
_data: &[u8],
) -> Result<(), TorshError> {
let target_nodes = self.select_replication_nodes(package_id)?;
let operation = ReplicationOperation {
id: uuid::Uuid::new_v4().to_string(),
package_id: package_id.to_string(),
version: version.to_string(),
operation_type: "Create".to_string(),
source_node: "primary".to_string(),
target_nodes: target_nodes.iter().map(|n| n.id.clone()).collect(),
timestamp: Utc::now(),
status: OperationStatus::Pending,
};
self.operations.push_back(operation);
match self.config.consistency {
ConsistencyLevel::Strong => self.replicate_synchronously(package_id, version)?,
ConsistencyLevel::Quorum => self.replicate_to_quorum(package_id, version)?,
ConsistencyLevel::Eventual | ConsistencyLevel::Causal => {
self.replicate_asynchronously(package_id, version)?
}
}
self.update_statistics();
Ok(())
}
pub fn get_package(&self, package_id: &str, version: &str) -> Result<String, TorshError> {
let key = format!("{}:{}", package_id, version);
let replicas = self
.replicas
.get(&key)
.ok_or_else(|| TorshError::InvalidArgument(format!("Package {} not found", key)))?;
let best_replica = self.select_best_replica(replicas)?;
Ok(best_replica.node_id.clone())
}
pub fn health_check(&mut self) -> Result<(), TorshError> {
let now = Utc::now();
let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
for node_id in node_ids {
let is_healthy = self.check_node_health(&node_id);
if let Some(node) = self.nodes.get_mut(&node_id) {
node.status = if is_healthy {
NodeStatus::Healthy
} else {
NodeStatus::Unhealthy
};
node.last_health_check = Some(now);
}
}
self.update_statistics();
if self.config.auto_failover {
self.handle_failover()?;
}
Ok(())
}
pub fn synchronize(&mut self) -> Result<(), TorshError> {
let mut to_sync = Vec::new();
for (key, replicas) in &self.replicas {
let max_version = replicas
.iter()
.map(|r| r.replica_version)
.max()
.unwrap_or(0);
for replica in replicas {
if replica.replica_version < max_version {
to_sync.push((key.clone(), replica.node_id.clone()));
}
}
}
for (key, node_id) in to_sync {
self.sync_replica(&key, &node_id)?;
}
Ok(())
}
pub fn resolve_conflicts(&mut self) -> Result<(), TorshError> {
let mut replicas_to_propagate = Vec::new();
for conflict in &mut self.conflicts {
if !conflict.resolved {
let strategy = ConflictResolution::LastWriteWins;
match strategy {
ConflictResolution::LastWriteWins => {
if let Some(latest) = conflict
.conflicting_replicas
.iter()
.max_by_key(|r| r.last_sync)
.cloned()
{
replicas_to_propagate.push(latest);
conflict.resolved = true;
conflict.resolution_strategy = Some(strategy);
}
}
ConflictResolution::FirstWriteWins => {
if let Some(earliest) = conflict
.conflicting_replicas
.iter()
.min_by_key(|r| r.last_sync)
.cloned()
{
replicas_to_propagate.push(earliest);
conflict.resolved = true;
conflict.resolution_strategy = Some(strategy);
}
}
ConflictResolution::Custom | ConflictResolution::Manual => {
}
}
}
}
for replica in replicas_to_propagate {
self.propagate_replica(&replica)?;
}
self.conflicts.retain(|c| !c.resolved);
Ok(())
}
pub fn get_statistics(&self) -> &ReplicationStatistics {
&self.statistics
}
pub fn get_node_status(&self, node_id: &str) -> Option<NodeStatus> {
self.nodes.get(node_id).map(|n| n.status)
}
pub fn list_nodes(&self) -> Vec<&ReplicationNode> {
self.nodes.values().collect()
}
pub fn list_replicas(&self, package_id: &str, version: &str) -> Vec<&ReplicaMetadata> {
let key = format!("{}:{}", package_id, version);
self.replicas
.get(&key)
.map(|replicas| replicas.iter().collect())
.unwrap_or_default()
}
pub fn get_conflicts(&self) -> Vec<&ReplicationConflict> {
self.conflicts.iter().collect()
}
fn select_replication_nodes(
&self,
_package_id: &str,
) -> Result<Vec<&ReplicationNode>, TorshError> {
let healthy_nodes: Vec<&ReplicationNode> = self
.nodes
.values()
.filter(|n| n.status == NodeStatus::Healthy)
.collect();
if healthy_nodes.is_empty() {
return Err(TorshError::RuntimeError(
"No healthy nodes available".to_string(),
));
}
let mut selected: Vec<&ReplicationNode> = healthy_nodes
.into_iter()
.take(self.config.replication_factor)
.collect();
selected.sort_by(|a, b| b.priority.cmp(&a.priority));
Ok(selected)
}
fn replicate_synchronously(
&mut self,
_package_id: &str,
_version: &str,
) -> Result<(), TorshError> {
if let Some(op) = self.operations.back_mut() {
op.status = OperationStatus::Completed;
}
Ok(())
}
fn replicate_to_quorum(&mut self, _package_id: &str, _version: &str) -> Result<(), TorshError> {
if let Some(op) = self.operations.back_mut() {
op.status = OperationStatus::Completed;
}
Ok(())
}
fn replicate_asynchronously(
&mut self,
_package_id: &str,
_version: &str,
) -> Result<(), TorshError> {
if let Some(op) = self.operations.back_mut() {
op.status = OperationStatus::Completed;
}
Ok(())
}
fn select_best_replica<'a>(
&self,
replicas: &'a [ReplicaMetadata],
) -> Result<&'a ReplicaMetadata, TorshError> {
replicas
.iter()
.filter(|r| {
self.nodes
.get(&r.node_id)
.map(|n| n.status == NodeStatus::Healthy)
.unwrap_or(false)
})
.min_by(|a, b| {
let a_lag = self
.nodes
.get(&a.node_id)
.map(|n| n.replication_lag_secs)
.unwrap_or(f64::MAX);
let b_lag = self
.nodes
.get(&b.node_id)
.map(|n| n.replication_lag_secs)
.unwrap_or(f64::MAX);
a_lag
.partial_cmp(&b_lag)
.expect("replication lag comparison should succeed (f64::MAX is valid)")
})
.ok_or_else(|| TorshError::RuntimeError("No healthy replicas".to_string()))
}
fn check_node_health(&self, _node_id: &str) -> bool {
true
}
fn handle_failover(&mut self) -> Result<(), TorshError> {
let unhealthy_nodes: Vec<String> = self
.nodes
.iter()
.filter(|(_, n)| n.status == NodeStatus::Unhealthy)
.map(|(id, _)| id.clone())
.collect();
for node_id in unhealthy_nodes {
self.redistribute_replicas(&node_id)?;
}
Ok(())
}
fn redistribute_replicas(&mut self, node_id: &str) -> Result<(), TorshError> {
let mut replicas_to_move = Vec::new();
for (key, replicas) in &self.replicas {
if replicas.iter().any(|r| r.node_id == node_id) {
replicas_to_move.push(key.clone());
}
}
for key in replicas_to_move {
if let Some(replicas) = self.replicas.get_mut(&key) {
replicas.retain(|r| r.node_id != node_id);
if replicas.len() < self.config.replication_factor {
}
}
}
Ok(())
}
fn sync_replica(&mut self, _key: &str, _node_id: &str) -> Result<(), TorshError> {
Ok(())
}
fn propagate_replica(&mut self, _replica: &ReplicaMetadata) -> Result<(), TorshError> {
Ok(())
}
fn update_statistics(&mut self) {
let mut stats = ReplicationStatistics::default();
stats.total_nodes = self.nodes.len();
stats.healthy_nodes = self
.nodes
.values()
.filter(|n| n.status == NodeStatus::Healthy)
.count();
stats.total_replicas = self.replicas.values().map(|v| v.len()).sum();
stats.total_operations = self.operations.len() as u64;
stats.successful_operations = self
.operations
.iter()
.filter(|op| op.status == OperationStatus::Completed)
.count() as u64;
stats.failed_operations = self
.operations
.iter()
.filter(|op| op.status == OperationStatus::Failed)
.count() as u64;
stats.active_conflicts = self.conflicts.len();
let total_lag: f64 = self.nodes.values().map(|n| n.replication_lag_secs).sum();
stats.avg_replication_lag_secs = if !self.nodes.is_empty() {
total_lag / self.nodes.len() as f64
} else {
0.0
};
self.statistics = stats;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_config() -> ReplicationConfig {
ReplicationConfig {
consistency: ConsistencyLevel::Eventual,
replication_factor: 3,
auto_failover: true,
sync_interval_secs: 60,
}
}
#[test]
fn test_replication_manager_creation() {
let config = create_test_config();
let manager = ReplicationManager::new(config);
let stats = manager.get_statistics();
assert_eq!(stats.total_nodes, 0);
}
#[test]
fn test_add_node() {
let config = create_test_config();
let mut manager = ReplicationManager::new(config);
let node = ReplicationNode::new(
"node1".to_string(),
"us-east-1".to_string(),
"https://node1.example.com".to_string(),
1,
1024 * 1024 * 1024,
);
manager.add_node(node).unwrap();
let stats = manager.get_statistics();
assert_eq!(stats.total_nodes, 1);
assert_eq!(stats.healthy_nodes, 1);
}
#[test]
fn test_remove_node() {
let config = create_test_config();
let mut manager = ReplicationManager::new(config);
let node = ReplicationNode::new(
"node1".to_string(),
"us-east-1".to_string(),
"https://node1.example.com".to_string(),
1,
1024 * 1024 * 1024,
);
manager.add_node(node).unwrap();
assert_eq!(manager.get_statistics().total_nodes, 1);
manager.remove_node("node1").unwrap();
assert_eq!(manager.get_statistics().total_nodes, 0);
}
#[test]
fn test_replicate_package() {
let config = create_test_config();
let mut manager = ReplicationManager::new(config);
for i in 1..=3 {
let node = ReplicationNode::new(
format!("node{}", i),
"us-east-1".to_string(),
format!("https://node{}.example.com", i),
i,
1024 * 1024 * 1024,
);
manager.add_node(node).unwrap();
}
let result = manager.replicate_package("test-pkg", "1.0.0", b"data");
assert!(result.is_ok());
let stats = manager.get_statistics();
assert!(stats.successful_operations > 0);
}
#[test]
fn test_health_check() {
let config = create_test_config();
let mut manager = ReplicationManager::new(config);
let node = ReplicationNode::new(
"node1".to_string(),
"us-east-1".to_string(),
"https://node1.example.com".to_string(),
1,
1024 * 1024 * 1024,
);
manager.add_node(node).unwrap();
manager.health_check().unwrap();
let status = manager.get_node_status("node1");
assert!(status.is_some());
}
#[test]
fn test_list_nodes() {
let config = create_test_config();
let mut manager = ReplicationManager::new(config);
for i in 1..=3 {
let node = ReplicationNode::new(
format!("node{}", i),
"us-east-1".to_string(),
format!("https://node{}.example.com", i),
i,
1024 * 1024 * 1024,
);
manager.add_node(node).unwrap();
}
let nodes = manager.list_nodes();
assert_eq!(nodes.len(), 3);
}
#[test]
fn test_consistency_levels() {
let configs = vec![
ConsistencyLevel::Eventual,
ConsistencyLevel::Quorum,
ConsistencyLevel::Strong,
ConsistencyLevel::Causal,
];
for consistency in configs {
let config = ReplicationConfig {
consistency,
replication_factor: 3,
auto_failover: true,
sync_interval_secs: 60,
};
let manager = ReplicationManager::new(config);
assert_eq!(manager.config.consistency, consistency);
}
}
#[test]
fn test_replication_statistics() {
let config = create_test_config();
let mut manager = ReplicationManager::new(config);
for i in 1..=5 {
let node = ReplicationNode::new(
format!("node{}", i),
"us-east-1".to_string(),
format!("https://node{}.example.com", i),
i,
1024 * 1024 * 1024,
);
manager.add_node(node).unwrap();
}
manager.replicate_package("pkg1", "1.0.0", b"data").unwrap();
manager.replicate_package("pkg2", "1.0.0", b"data").unwrap();
let stats = manager.get_statistics();
assert_eq!(stats.total_nodes, 5);
assert_eq!(stats.healthy_nodes, 5);
assert!(stats.successful_operations >= 2);
}
}