use crate::distributed::shard::{EdgeData, GraphShard, NodeData, NodeId, ShardId};
use crate::{GraphError, Result};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use ruvector_replication::{
Replica, ReplicaRole, ReplicaSet, ReplicationLog, SyncManager, SyncMode,
};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReplicationStrategy {
FullShard,
VertexCut,
Subgraph,
Hybrid,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphReplicationConfig {
pub replication_factor: usize,
pub strategy: ReplicationStrategy,
pub high_degree_threshold: usize,
pub sync_mode: SyncMode,
pub enable_conflict_resolution: bool,
pub timeout_seconds: u64,
}
impl Default for GraphReplicationConfig {
fn default() -> Self {
Self {
replication_factor: 3,
strategy: ReplicationStrategy::FullShard,
high_degree_threshold: 100,
sync_mode: SyncMode::Async,
enable_conflict_resolution: true,
timeout_seconds: 30,
}
}
}
pub struct GraphReplication {
config: GraphReplicationConfig,
replica_sets: Arc<DashMap<ShardId, Arc<ReplicaSet>>>,
sync_managers: Arc<DashMap<ShardId, Arc<SyncManager>>>,
high_degree_nodes: Arc<DashMap<NodeId, usize>>,
node_replicas: Arc<DashMap<NodeId, Vec<String>>>,
}
impl GraphReplication {
pub fn new(config: GraphReplicationConfig) -> Self {
Self {
config,
replica_sets: Arc::new(DashMap::new()),
sync_managers: Arc::new(DashMap::new()),
high_degree_nodes: Arc::new(DashMap::new()),
node_replicas: Arc::new(DashMap::new()),
}
}
pub fn initialize_shard_replication(
&self,
shard_id: ShardId,
primary_node: String,
replica_nodes: Vec<String>,
) -> Result<()> {
info!(
"Initializing replication for shard {} with {} replicas",
shard_id,
replica_nodes.len()
);
let mut replica_set = ReplicaSet::new(format!("shard-{}", shard_id));
replica_set
.add_replica(
&primary_node,
&format!("{}:9001", primary_node),
ReplicaRole::Primary,
)
.map_err(|e| GraphError::ReplicationError(e))?;
for (idx, node) in replica_nodes.iter().enumerate() {
replica_set
.add_replica(
&format!("{}-replica-{}", node, idx),
&format!("{}:9001", node),
ReplicaRole::Secondary,
)
.map_err(|e| GraphError::ReplicationError(e))?;
}
let replica_set = Arc::new(replica_set);
let log = Arc::new(ReplicationLog::new(&primary_node));
let sync_manager = Arc::new(SyncManager::new(Arc::clone(&replica_set), log));
sync_manager.set_sync_mode(self.config.sync_mode.clone());
self.replica_sets.insert(shard_id, replica_set);
self.sync_managers.insert(shard_id, sync_manager);
Ok(())
}
pub async fn replicate_node_add(&self, shard_id: ShardId, node: NodeData) -> Result<()> {
debug!(
"Replicating node addition: {} to shard {}",
node.id, shard_id
);
match self.config.strategy {
ReplicationStrategy::FullShard => {
self.replicate_to_shard(shard_id, ReplicationOp::AddNode(node))
.await
}
ReplicationStrategy::VertexCut => {
let degree = self.get_node_degree(&node.id);
if degree >= self.config.high_degree_threshold {
self.replicate_high_degree_node(node).await
} else {
self.replicate_to_shard(shard_id, ReplicationOp::AddNode(node))
.await
}
}
ReplicationStrategy::Subgraph | ReplicationStrategy::Hybrid => {
self.replicate_to_shard(shard_id, ReplicationOp::AddNode(node))
.await
}
}
}
pub async fn replicate_edge_add(&self, shard_id: ShardId, edge: EdgeData) -> Result<()> {
debug!(
"Replicating edge addition: {} to shard {}",
edge.id, shard_id
);
self.increment_node_degree(&edge.from);
self.increment_node_degree(&edge.to);
self.replicate_to_shard(shard_id, ReplicationOp::AddEdge(edge))
.await
}
pub async fn replicate_node_delete(&self, shard_id: ShardId, node_id: NodeId) -> Result<()> {
debug!(
"Replicating node deletion: {} from shard {}",
node_id, shard_id
);
self.replicate_to_shard(shard_id, ReplicationOp::DeleteNode(node_id))
.await
}
pub async fn replicate_edge_delete(&self, shard_id: ShardId, edge_id: String) -> Result<()> {
debug!(
"Replicating edge deletion: {} from shard {}",
edge_id, shard_id
);
self.replicate_to_shard(shard_id, ReplicationOp::DeleteEdge(edge_id))
.await
}
async fn replicate_to_shard(&self, shard_id: ShardId, op: ReplicationOp) -> Result<()> {
let sync_manager = self
.sync_managers
.get(&shard_id)
.ok_or_else(|| GraphError::ShardError(format!("Shard {} not initialized", shard_id)))?;
let data = bincode::encode_to_vec(&op, bincode::config::standard())
.map_err(|e| GraphError::SerializationError(e.to_string()))?;
debug!("Replicating operation for shard {}", shard_id);
Ok(())
}
async fn replicate_high_degree_node(&self, node: NodeData) -> Result<()> {
info!(
"Replicating high-degree node {} to multiple shards",
node.id
);
let degree = self.get_node_degree(&node.id);
let replica_count =
(degree / self.config.high_degree_threshold).min(self.config.replication_factor);
let mut replica_shards = Vec::new();
for shard_id in 0..replica_count {
replica_shards.push(shard_id as ShardId);
}
for shard_id in replica_shards.clone() {
self.replicate_to_shard(shard_id, ReplicationOp::AddNode(node.clone()))
.await?;
}
self.node_replicas.insert(
node.id.clone(),
replica_shards.iter().map(|s| s.to_string()).collect(),
);
Ok(())
}
fn get_node_degree(&self, node_id: &NodeId) -> usize {
self.high_degree_nodes
.get(node_id)
.map(|d| *d.value())
.unwrap_or(0)
}
fn increment_node_degree(&self, node_id: &NodeId) {
self.high_degree_nodes
.entry(node_id.clone())
.and_modify(|d| *d += 1)
.or_insert(1);
}
pub fn get_replica_set(&self, shard_id: ShardId) -> Option<Arc<ReplicaSet>> {
self.replica_sets
.get(&shard_id)
.map(|r| Arc::clone(r.value()))
}
pub fn get_sync_manager(&self, shard_id: ShardId) -> Option<Arc<SyncManager>> {
self.sync_managers
.get(&shard_id)
.map(|s| Arc::clone(s.value()))
}
pub fn get_stats(&self) -> ReplicationStats {
ReplicationStats {
total_shards: self.replica_sets.len(),
high_degree_nodes: self.high_degree_nodes.len(),
replicated_nodes: self.node_replicas.len(),
strategy: self.config.strategy,
}
}
pub async fn health_check(&self) -> HashMap<ShardId, ReplicaHealth> {
let mut health = HashMap::new();
for entry in self.replica_sets.iter() {
let shard_id = *entry.key();
let replica_set = entry.value();
let healthy_count = self.config.replication_factor;
health.insert(
shard_id,
ReplicaHealth {
total_replicas: self.config.replication_factor,
healthy_replicas: healthy_count,
is_healthy: healthy_count >= (self.config.replication_factor / 2 + 1),
},
);
}
health
}
pub fn config(&self) -> &GraphReplicationConfig {
&self.config
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum ReplicationOp {
AddNode(NodeData),
AddEdge(EdgeData),
DeleteNode(NodeId),
DeleteEdge(String),
UpdateNode(NodeData),
UpdateEdge(EdgeData),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationStats {
pub total_shards: usize,
pub high_degree_nodes: usize,
pub replicated_nodes: usize,
pub strategy: ReplicationStrategy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicaHealth {
pub total_replicas: usize,
pub healthy_replicas: usize,
pub is_healthy: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[tokio::test]
async fn test_graph_replication() {
let config = GraphReplicationConfig::default();
let replication = GraphReplication::new(config);
replication
.initialize_shard_replication(0, "node-1".to_string(), vec!["node-2".to_string()])
.unwrap();
assert!(replication.get_replica_set(0).is_some());
assert!(replication.get_sync_manager(0).is_some());
}
#[tokio::test]
async fn test_node_replication() {
let config = GraphReplicationConfig::default();
let replication = GraphReplication::new(config);
replication
.initialize_shard_replication(0, "node-1".to_string(), vec!["node-2".to_string()])
.unwrap();
let node = NodeData {
id: "test-node".to_string(),
properties: HashMap::new(),
labels: vec!["Test".to_string()],
};
let result = replication.replicate_node_add(0, node).await;
assert!(result.is_ok());
}
#[test]
fn test_replication_stats() {
let config = GraphReplicationConfig::default();
let replication = GraphReplication::new(config);
let stats = replication.get_stats();
assert_eq!(stats.total_shards, 0);
assert_eq!(stats.strategy, ReplicationStrategy::FullShard);
}
}