use crate::agent::AgentId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use thiserror::Error;
pub type NodeId = String;
#[derive(Debug, Error)]
pub enum ReplicationError {
#[error("Replication failed: {0}")]
ReplicationFailed(String),
#[error("Node not found: {0}")]
NodeNotFound(String),
#[error("State mismatch: {0}")]
StateMismatch(String),
#[error("Replication timeout")]
Timeout,
#[error("Insufficient replicas: {current}/{required}")]
InsufficientReplicas { current: usize, required: usize },
}
pub type ReplicationResult<T> = Result<T, ReplicationError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplicationStrategy {
Synchronous,
Asynchronous,
Quorum { min_replicas: usize },
BestEffort,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReplicationState {
Idle,
Replicating,
Synchronized,
Failed,
Degraded,
}
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub strategy: ReplicationStrategy,
pub replication_factor: usize,
pub timeout: Duration,
pub enable_compression: bool,
pub verify_checksum: bool,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
strategy: ReplicationStrategy::Quorum { min_replicas: 2 },
replication_factor: 3,
timeout: Duration::from_secs(10),
enable_compression: true,
verify_checksum: true,
}
}
}
#[derive(Debug, Clone)]
pub struct StateReplica {
pub node_id: NodeId,
pub state_data: Vec<u8>,
pub checksum: u64,
pub last_updated: Instant,
pub synchronized: bool,
}
#[derive(Debug, Clone)]
pub struct ReplicationLog {
pub agent_id: AgentId,
pub source_node: NodeId,
pub target_nodes: Vec<NodeId>,
pub timestamp: Instant,
pub success_count: usize,
pub failure_count: usize,
pub duration: Duration,
}
pub struct ReplicationManager {
config: ReplicationConfig,
replicas: Arc<RwLock<HashMap<AgentId, Vec<StateReplica>>>>,
logs: Arc<RwLock<Vec<ReplicationLog>>>,
states: Arc<RwLock<HashMap<AgentId, ReplicationState>>>,
}
impl ReplicationManager {
pub fn new(config: ReplicationConfig) -> Self {
Self {
config,
replicas: Arc::new(RwLock::new(HashMap::new())),
logs: Arc::new(RwLock::new(Vec::new())),
states: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn replicate_state(
&self,
agent_id: &AgentId,
state_data: Vec<u8>,
target_nodes: Vec<NodeId>,
) -> ReplicationResult<ReplicationLog> {
let start_time = Instant::now();
self.set_state(agent_id, ReplicationState::Replicating)?;
let checksum = if self.config.verify_checksum {
self.calculate_checksum(&state_data)
} else {
0
};
let _data_to_replicate = if self.config.enable_compression {
self.compress_data(&state_data)?
} else {
state_data.clone()
};
let mut success_count = 0;
let mut replicas = Vec::new();
for node_id in &target_nodes {
match self.replicate_to_node(agent_id, node_id, &state_data, checksum) {
Ok(replica) => {
replicas.push(replica);
success_count += 1;
}
Err(_) => {
}
}
if self.should_stop_replication(success_count, target_nodes.len())? {
break;
}
}
self.replicas
.write()
.map_err(|_| ReplicationError::ReplicationFailed("Failed to acquire lock".to_string()))?
.insert(*agent_id, replicas);
let final_state = self.evaluate_replication_result(success_count, target_nodes.len())?;
self.set_state(agent_id, final_state)?;
let target_nodes_len = target_nodes.len();
let log = ReplicationLog {
agent_id: *agent_id,
source_node: "local".to_string(),
target_nodes,
timestamp: start_time,
success_count,
failure_count: target_nodes_len - success_count,
duration: start_time.elapsed(),
};
self.logs
.write()
.map_err(|_| ReplicationError::ReplicationFailed("Failed to acquire lock".to_string()))?
.push(log.clone());
Ok(log)
}
fn replicate_to_node(
&self,
_agent_id: &AgentId,
node_id: &NodeId,
data: &[u8],
checksum: u64,
) -> ReplicationResult<StateReplica> {
Ok(StateReplica {
node_id: node_id.clone(),
state_data: data.to_vec(),
checksum,
last_updated: Instant::now(),
synchronized: true,
})
}
fn should_stop_replication(
&self,
success_count: usize,
total_targets: usize,
) -> ReplicationResult<bool> {
match &self.config.strategy {
ReplicationStrategy::Synchronous => Ok(success_count == total_targets),
ReplicationStrategy::Asynchronous => Ok(success_count > 0),
ReplicationStrategy::Quorum { .. } => Ok(false), ReplicationStrategy::BestEffort => Ok(false), }
}
fn evaluate_replication_result(
&self,
success_count: usize,
total_targets: usize,
) -> ReplicationResult<ReplicationState> {
match &self.config.strategy {
ReplicationStrategy::Synchronous => {
if success_count == total_targets {
Ok(ReplicationState::Synchronized)
} else {
Err(ReplicationError::InsufficientReplicas {
current: success_count,
required: total_targets,
})
}
}
ReplicationStrategy::Asynchronous | ReplicationStrategy::BestEffort => {
if success_count > 0 {
Ok(if success_count == total_targets {
ReplicationState::Synchronized
} else {
ReplicationState::Degraded
})
} else {
Err(ReplicationError::ReplicationFailed(
"No replicas created".to_string(),
))
}
}
ReplicationStrategy::Quorum { min_replicas } => {
if success_count >= *min_replicas {
Ok(if success_count == total_targets {
ReplicationState::Synchronized
} else {
ReplicationState::Degraded
})
} else {
Err(ReplicationError::InsufficientReplicas {
current: success_count,
required: *min_replicas,
})
}
}
}
}
pub fn get_replicas(&self, agent_id: &AgentId) -> ReplicationResult<Vec<StateReplica>> {
Ok(self
.replicas
.read()
.map_err(|_| ReplicationError::ReplicationFailed("Failed to acquire lock".to_string()))?
.get(agent_id)
.cloned()
.unwrap_or_default())
}
pub fn get_state(&self, agent_id: &AgentId) -> ReplicationResult<ReplicationState> {
Ok(self
.states
.read()
.map_err(|_| ReplicationError::ReplicationFailed("Failed to acquire lock".to_string()))?
.get(agent_id)
.cloned()
.unwrap_or(ReplicationState::Idle))
}
fn set_state(&self, agent_id: &AgentId, state: ReplicationState) -> ReplicationResult<()> {
self.states
.write()
.map_err(|_| ReplicationError::ReplicationFailed("Failed to acquire lock".to_string()))?
.insert(*agent_id, state);
Ok(())
}
pub fn get_logs(&self) -> ReplicationResult<Vec<ReplicationLog>> {
Ok(self
.logs
.read()
.map_err(|_| ReplicationError::ReplicationFailed("Failed to acquire lock".to_string()))?
.clone())
}
fn calculate_checksum(&self, data: &[u8]) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
data.hash(&mut hasher);
hasher.finish()
}
fn compress_data(&self, data: &[u8]) -> ReplicationResult<Vec<u8>> {
let mut compressed = Vec::new();
let mut count = 1u8;
let mut current = data.first().copied();
for &byte in data.iter().skip(1) {
if Some(byte) == current && count < 255 {
count += 1;
} else {
if let Some(c) = current {
compressed.push(count);
compressed.push(c);
}
current = Some(byte);
count = 1;
}
}
if let Some(c) = current {
compressed.push(count);
compressed.push(c);
}
Ok(compressed)
}
pub fn verify_replica(&self, replica: &StateReplica) -> ReplicationResult<bool> {
if !self.config.verify_checksum {
return Ok(true);
}
let calculated_checksum = self.calculate_checksum(&replica.state_data);
Ok(calculated_checksum == replica.checksum)
}
pub fn synchronize(&self, agent_id: &AgentId) -> ReplicationResult<usize> {
let replicas = self.get_replicas(agent_id)?;
let mut synchronized_count = 0;
for replica in replicas {
if self.verify_replica(&replica)? {
synchronized_count += 1;
}
}
Ok(synchronized_count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::Agent;
#[test]
fn test_replication_manager_creation() {
let config = ReplicationConfig::default();
let _manager = ReplicationManager::new(config);
}
#[test]
fn test_replicate_state() {
let config = ReplicationConfig {
strategy: ReplicationStrategy::Quorum { min_replicas: 2 },
replication_factor: 3,
..Default::default()
};
let manager = ReplicationManager::new(config);
let agent = Agent::new(vec![0x00, 0x61, 0x73, 0x6d]);
let state_data = vec![1, 2, 3, 4, 5];
let targets = vec![
"node1".to_string(),
"node2".to_string(),
"node3".to_string(),
];
let log = manager
.replicate_state(&agent.id(), state_data, targets)
.expect("replicate state");
assert!(log.success_count >= 2);
assert_eq!(
manager.get_state(&agent.id()).expect("get state"),
ReplicationState::Synchronized
);
}
#[test]
fn test_synchronous_replication() {
let config = ReplicationConfig {
strategy: ReplicationStrategy::Synchronous,
..Default::default()
};
let manager = ReplicationManager::new(config);
let agent = Agent::new(vec![0x00, 0x61, 0x73, 0x6d]);
let state_data = vec![1, 2, 3, 4, 5];
let targets = vec!["node1".to_string(), "node2".to_string()];
let log = manager
.replicate_state(&agent.id(), state_data, targets)
.expect("replicate state");
assert_eq!(log.success_count, 2);
assert_eq!(log.failure_count, 0);
}
#[test]
fn test_get_replicas() {
let config = ReplicationConfig::default();
let manager = ReplicationManager::new(config);
let agent = Agent::new(vec![0x00, 0x61, 0x73, 0x6d]);
let state_data = vec![1, 2, 3, 4, 5];
let targets = vec!["node1".to_string(), "node2".to_string()];
manager
.replicate_state(&agent.id(), state_data, targets)
.expect("replicate state");
let replicas = manager.get_replicas(&agent.id()).expect("get replicas");
assert!(!replicas.is_empty());
}
#[test]
fn test_checksum_verification() {
let config = ReplicationConfig {
verify_checksum: true,
..Default::default()
};
let manager = ReplicationManager::new(config);
let data = vec![1, 2, 3, 4, 5];
let checksum = manager.calculate_checksum(&data);
let replica = StateReplica {
node_id: "node1".to_string(),
state_data: data,
checksum,
last_updated: Instant::now(),
synchronized: true,
};
assert!(manager.verify_replica(&replica).expect("verify replica"));
}
#[test]
fn test_compression() {
let config = ReplicationConfig::default();
let manager = ReplicationManager::new(config);
let data = vec![1, 1, 1, 2, 2, 3];
let compressed = manager.compress_data(&data).expect("compress");
assert!(!compressed.is_empty());
}
#[test]
fn test_replication_logs() {
let config = ReplicationConfig {
strategy: ReplicationStrategy::BestEffort,
..Default::default()
};
let manager = ReplicationManager::new(config);
let agent = Agent::new(vec![0x00, 0x61, 0x73, 0x6d]);
let state_data = vec![1, 2, 3, 4, 5];
let targets = vec!["node1".to_string()];
manager
.replicate_state(&agent.id(), state_data, targets)
.expect("replicate state");
let logs = manager.get_logs().expect("get logs");
assert_eq!(logs.len(), 1);
}
#[test]
fn test_synchronize() {
let config = ReplicationConfig::default();
let manager = ReplicationManager::new(config);
let agent = Agent::new(vec![0x00, 0x61, 0x73, 0x6d]);
let state_data = vec![1, 2, 3, 4, 5];
let targets = vec!["node1".to_string(), "node2".to_string()];
manager
.replicate_state(&agent.id(), state_data, targets)
.expect("replicate state");
let sync_count = manager.synchronize(&agent.id()).expect("synchronize");
assert!(sync_count > 0);
}
}